Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2004-12-29 06:44:54 +0000
committerslewis2004-12-29 06:44:54 +0000
commitfeed39c1ad5ed5d9b34b8020e12a27a23bab7c72 (patch)
treec11350dac0a19133196a1ef430db6150d445608a
parent78e0981466e30e04369bb909a1cf8aefa14a3ce9 (diff)
downloadorg.eclipse.ecf-feed39c1ad5ed5d9b34b8020e12a27a23bab7c72.tar.gz
org.eclipse.ecf-feed39c1ad5ed5d9b34b8020e12a27a23bab7c72.tar.xz
org.eclipse.ecf-feed39c1ad5ed5d9b34b8020e12a27a23bab7c72.zip
Initial checkin of org.eclipse.ecf.provider plugin
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/.classpath7
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/.cvsignore1
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/.project28
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF24
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/build.properties4
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/plugin.xml35
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java28
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java69
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java18
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java563
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java27
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java16
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java54
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java45
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java8
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java9
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java7
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java8
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java8
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java93
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java50
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java13
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java288
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java32
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java166
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java119
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java84
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java116
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java675
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java49
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java323
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java45
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java218
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java264
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java294
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java230
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java43
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java97
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java140
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java42
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java80
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java53
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java19
43 files changed, 4492 insertions, 0 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/.classpath b/framework/bundles/org.eclipse.ecf.provider/.classpath
new file mode 100644
index 000000000..065ac06e1
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
diff --git a/framework/bundles/org.eclipse.ecf.provider/.cvsignore b/framework/bundles/org.eclipse.ecf.provider/.cvsignore
new file mode 100644
index 000000000..ba077a403
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/.cvsignore
@@ -0,0 +1 @@
+bin
diff --git a/framework/bundles/org.eclipse.ecf.provider/.project b/framework/bundles/org.eclipse.ecf.provider/.project
new file mode 100644
index 000000000..0a235b6a0
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/.project
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>org.eclipse.ecf.provider</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ManifestBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.SchemaBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.pde.PluginNature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF b/framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF
new file mode 100644
index 000000000..532345de5
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF
@@ -0,0 +1,24 @@
+Manifest-Version: 1.0
+Bundle-Name: Eclipse Communications Framework Provider Plug-in
+Bundle-SymbolicName: org.eclipse.ecf.provider; singleton=true
+Bundle-Version: 0.0.1
+Bundle-ClassPath: provider.jar
+Bundle-Activator: org.eclipse.ecf.provider.ProviderPlugin
+Bundle-Vendor: eclipse.org
+Bundle-Localization: plugin
+Require-Bundle: org.eclipse.core.runtime,
+ org.eclipse.ecf
+Eclipse-AutoStart: true
+DynamicImport-Package: *
+Provide-Package: org.eclipse.ecf.provider,
+org.eclipse.ecf.provider.comm.tcp
+org.eclipse.ecf.provider.generic,
+org.eclipse.ecf.provider.generic.events
+org.eclipse.ecf.provider.generic.gmm
+Bundle-SymbolicName: org.eclipse.ecf.provider
+Bundle-Version: 0.0.1
+Bundle-Name: ECF - Provider Plugin
+Bundle-Vendor: Eclipse.org
+Bundle-Activator: org.eclipse.ecf.provider.ProviderPlugin
+
+
diff --git a/framework/bundles/org.eclipse.ecf.provider/build.properties b/framework/bundles/org.eclipse.ecf.provider/build.properties
new file mode 100644
index 000000000..4955ccc98
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/build.properties
@@ -0,0 +1,4 @@
+source.provider.jar = src/
+output.provider.jar = bin/
+bin.includes = plugin.xml,\
+ provider.jar
diff --git a/framework/bundles/org.eclipse.ecf.provider/plugin.xml b/framework/bundles/org.eclipse.ecf.provider/plugin.xml
new file mode 100644
index 000000000..1b3e5ea30
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/plugin.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?eclipse version="3.0"?>
+<plugin
+ id="org.eclipse.ecf.provider"
+ name="ECF Provider Plug-in"
+ version="0.0.1"
+ provider-name="eclipse.org"
+ class="org.eclipse.ecf.provider.ProviderPlugin">
+
+ <runtime>
+ <library name="provider.jar">
+ <export name="*"/>
+ </library>
+ </runtime>
+
+ <requires>
+ <import plugin="org.eclipse.core.runtime"/>
+ <import plugin="org.eclipse.ecf"/>
+ </requires>
+ <extension
+ point="org.eclipse.ecf.containerFactory">
+ <containerFactory
+ class="org.eclipse.ecf.provider.generic.ContainerInstantiator"
+ description="Generic container implementation (server and client)"
+ name="generic"/>
+ </extension>
+ <extension
+ point="org.eclipse.ecf.comm">
+ <connection
+ description="TCP Client Connection"
+ name="tcpclient"
+ instantiatorClass="org.eclipse.ecf.provider.comm.tcp.Client$Creator"/>
+ </extension>
+
+</plugin>
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java
new file mode 100644
index 000000000..497eed1d3
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java
@@ -0,0 +1,28 @@
+package org.eclipse.ecf.provider;
+
+public class Debug {
+
+ public static boolean ON = false;
+
+ public static Debug create(String key) {
+ return new Debug(key);
+ }
+
+ public static void errDumpStack(Throwable e, String msg) {
+ System.err.println(msg);
+ e.printStackTrace(System.err);
+ }
+ public void dumpStack(Throwable e, String msg) {
+ System.err.println(msg);
+ e.printStackTrace(System.err);
+ }
+ public void msg(String msg) {
+ System.err.println(msg);
+ }
+ protected Debug(String str) {
+ }
+ public static void setThreadDebugGroup(Object obj) {
+ // Do nothing
+ }
+
+}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java
new file mode 100644
index 000000000..ad41dba1f
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java
@@ -0,0 +1,69 @@
+package org.eclipse.ecf.provider;
+
+import org.eclipse.core.runtime.Plugin;
+import org.osgi.framework.BundleContext;
+import java.util.*;
+
+/**
+ * The main plugin class to be used in the desktop.
+ */
+public class ProviderPlugin extends Plugin {
+ //The shared instance.
+ private static ProviderPlugin plugin;
+ //Resource bundle.
+ private ResourceBundle resourceBundle;
+
+ /**
+ * The constructor.
+ */
+ public ProviderPlugin() {
+ super();
+ plugin = this;
+ try {
+ resourceBundle = ResourceBundle.getBundle("org.eclipse.ecf.provider.ProviderPluginResources");
+ } catch (MissingResourceException x) {
+ resourceBundle = null;
+ }
+ }
+
+ /**
+ * This method is called upon plug-in activation
+ */
+ public void start(BundleContext context) throws Exception {
+ super.start(context);
+ }
+
+ /**
+ * This method is called when the plug-in is stopped
+ */
+ public void stop(BundleContext context) throws Exception {
+ super.stop(context);
+ }
+
+ /**
+ * Returns the shared instance.
+ */
+ public static ProviderPlugin getDefault() {
+ return plugin;
+ }
+
+ /**
+ * Returns the string from the plugin's resource bundle,
+ * or 'key' if not found.
+ */
+ public static String getResourceString(String key) {
+ ResourceBundle bundle = ProviderPlugin.getDefault().getResourceBundle();
+ try {
+ return (bundle != null) ? bundle.getString(key) : key;
+ } catch (MissingResourceException e) {
+ return key;
+ }
+ }
+
+ /**
+ * Returns the plugin's resource bundle,
+ */
+ public ResourceBundle getResourceBundle() {
+ return resourceBundle;
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java
new file mode 100644
index 000000000..b7d20717c
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java
@@ -0,0 +1,18 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.Serializable;
+
+public class AsynchMessage implements Serializable {
+
+ Serializable data;
+
+ protected AsynchMessage() {
+ }
+
+ protected AsynchMessage(Serializable data) {
+ this.data = data;
+ }
+ Serializable getData() {
+ return data;
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
new file mode 100644
index 000000000..8ad25edf7
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
@@ -0,0 +1,563 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Vector;
+
+import org.eclipse.ecf.core.comm.AsynchConnectionEvent;
+import org.eclipse.ecf.core.comm.ConnectionEvent;
+import org.eclipse.ecf.core.comm.ConnectionInstantiationException;
+import org.eclipse.ecf.core.comm.DisconnectConnectionEvent;
+import org.eclipse.ecf.core.comm.IConnectionEventHandler;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnectionEventHandler;
+import org.eclipse.ecf.core.comm.SynchConnectionEvent;
+import org.eclipse.ecf.core.comm.provider.ISynchAsynchConnectionInstantiator;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.identity.IDFactory;
+import org.eclipse.ecf.core.util.SimpleQueueImpl;
+import org.eclipse.ecf.provider.Debug;
+
+public final class Client implements ISynchAsynchConnection {
+
+ public static class Creator implements ISynchAsynchConnectionInstantiator {
+ public ISynchAsynchConnection makeInstance(ISynchAsynchConnectionEventHandler handler, Class[] clazzes,
+ Object[] args) throws ConnectionInstantiationException {
+ try {
+ Integer ka = new Integer(0);
+ if (args.length > 0) {
+ ka = (Integer) args[0];
+ }
+ return new Client(handler,
+ ka);
+ } catch (RuntimeException e) {
+ throw new ConnectionInstantiationException(
+ "Exception in creating connection "+Client.class.getName(), e);
+ }
+ }
+
+ }
+ public static final String PROTOCOL = "ecftcp";
+
+ public static final Debug debug = Debug.create(Client.class.getName());
+
+ public static final int SNDR_PRIORITY = Thread.NORM_PRIORITY;
+ public static final int RCVR_PRIORITY = Thread.NORM_PRIORITY;
+ // Default close timeout is 1.5 seconds
+ public static final long CLOSE_TIMEOUT = 1500;
+
+ public static final int DEF_MAX_MSG = 50;
+
+ protected String address;
+ protected int port;
+ protected Socket socket;
+ protected ObjectOutputStream outputStream;
+ protected ObjectInputStream inputStream;
+
+ protected ISynchAsynchConnectionEventHandler handler;
+ protected SimpleQueueImpl queue = new SimpleQueueImpl();
+ protected int keepAlive = 0;
+
+ protected Thread sendThread;
+ protected Thread rcvThread;
+ protected Thread keepAliveThread;
+
+ protected boolean isClosing = false;
+ protected boolean waitForPing = false;
+ protected PingMessage ping = new PingMessage();
+ protected PingResponseMessage pingResp = new PingResponseMessage();
+ protected long nextPingTime;
+
+ protected int maxMsg = DEF_MAX_MSG;
+ protected long closeTimeout = CLOSE_TIMEOUT;
+
+ protected Vector eventNotify = null;
+
+ protected Map properties;
+
+ public Map getProperties() {
+ return properties;
+ }
+ public void setProperties(Map props) {
+ this.properties = props;
+ }
+ public Client(Socket aSocket, ObjectInputStream iStream,
+ ObjectOutputStream oStream,
+ ISynchAsynchConnectionEventHandler handler, int keepAlive)
+ throws IOException {
+ this(aSocket, iStream, oStream, handler, keepAlive, DEF_MAX_MSG);
+ }
+ public Client(Socket aSocket, ObjectInputStream iStream,
+ ObjectOutputStream oStream,
+ ISynchAsynchConnectionEventHandler handler, int keepAlive,
+ int maxmsgs) throws IOException {
+ socket = aSocket;
+ // Set TCP no delay
+ socket.setTcpNoDelay(true);
+ address = socket.getInetAddress().getHostName();
+ port = socket.getPort();
+ inputStream = iStream;
+ outputStream = oStream;
+ this.handler = handler;
+ this.keepAlive = keepAlive;
+ maxMsg = maxmsgs;
+ properties = new Properties();
+ setupThreads();
+ }
+ public Client(ISynchAsynchConnectionEventHandler handler, Integer keepAlive) {
+ this(handler, keepAlive.intValue());
+ }
+ public Client(ISynchAsynchConnectionEventHandler handler, int keepAlive) {
+ this(handler, keepAlive, DEF_MAX_MSG);
+ }
+ public Client(ISynchAsynchConnectionEventHandler handler, int keepAlive,
+ int maxmsgs) {
+ this.handler = handler;
+ this.keepAlive = keepAlive;
+ maxMsg = maxmsgs;
+ this.properties = new Properties();
+ }
+
+ public synchronized ID getLocalID() {
+ if (socket == null)
+ return null;
+ ID retID = null;
+ try {
+ retID = IDFactory.makeStringID(PROTOCOL + "://"
+ + socket.getLocalAddress().getHostName() + ":" + port);
+ } catch (Exception e) {
+ return null;
+ }
+ return retID;
+ }
+ public synchronized void removeCommEventListener(IConnectionEventHandler l) {
+ eventNotify.remove(l);
+ }
+ public synchronized void addCommEventListener(IConnectionEventHandler l) {
+ if (eventNotify == null) {
+ eventNotify = new Vector();
+ }
+ eventNotify.add(l);
+ }
+ public synchronized boolean isConnected() {
+ if (socket != null) {
+ return socket.isConnected();
+ }
+ return false;
+ }
+ public synchronized boolean isStarted() {
+ if (sendThread != null) {
+ return sendThread.isAlive();
+ }
+ return false;
+ }
+
+ protected void fireSuspect(Exception e) {
+ Vector v = null;
+ synchronized (this) {
+ if (eventNotify == null)
+ return;
+ v = eventNotify;
+ }
+ for (Enumeration enum = v.elements(); enum.hasMoreElements();) {
+ IConnectionEventHandler h = (IConnectionEventHandler) enum
+ .nextElement();
+ h.handleSuspectEvent(new ConnectionEvent(this, e));
+ }
+ }
+
+ public synchronized Object connect(ID remote, Object data, int timeout)
+ throws IOException {
+ if (socket != null)
+ throw new ConnectException("Client already connected");
+
+ URI anURI = null;
+ try {
+ anURI = remote.toURI();
+ } catch (URISyntaxException e) {
+ throw new IOException("Can't connect to address "
+ + remote.getName() + ". Invalid URL");
+ }
+
+ address = anURI.getHost();
+ port = anURI.getPort();
+
+ SocketFactory fact = SocketFactory.getSocketFactory();
+ if (fact == null) {
+ fact = SocketFactory.getDefaultSocketFactory();
+ }
+ if (Debug.ON && debug != null) {
+ debug.msg("Connecting to " + address + ":" + port);
+ }
+ // Actually connect to remote using socket from socket factory.
+ socket = fact.createSocket(address, port, timeout);
+ // Set TCP no delay
+ socket.setTcpNoDelay(true);
+ boolean compatibility = false;
+ //boolean compatibility = TCPCompatibility.useCompatibility;
+ outputStream = new ExObjectOutputStream(new BufferedOutputStream(socket
+ .getOutputStream()), compatibility);
+ outputStream.flush();
+ inputStream = new ExObjectInputStream(socket.getInputStream(),
+ compatibility);
+ // send connect data
+ sendIt(new ConnectRequestMessage(anURI, (Serializable) data));
+
+ ConnectResultMessage res = null;
+ res = (ConnectResultMessage) readObject();
+ // Setup threads
+ setupThreads();
+ // Return results.
+ return res.getData();
+ }
+
+ private void setupThreads() {
+ // Setup threads
+ sendThread = (Thread) AccessController
+ .doPrivileged(new PrivilegedAction() {
+ public Object run() {
+ return getSendThread();
+ }
+ });
+ rcvThread = (Thread) AccessController
+ .doPrivileged(new PrivilegedAction() {
+ public Object run() {
+ return getRcvThread();
+ }
+ });
+ }
+
+ private Thread getSendThread() {
+ Thread aThread = new Thread(new Runnable() {
+ public void run() {
+ int msgCount = 0;
+ Thread me = Thread.currentThread();
+ // Loop until done sending messages
+ for (;;) {
+ if (me.isInterrupted())
+ break;
+ Serializable aMsg = (Serializable) queue.peekQueue();
+ if (me.isInterrupted() || aMsg == null)
+ break;
+ try {
+ // Actually send message
+ sendIt(aMsg);
+ // Successful...remove message from queue
+ queue.removeHead();
+
+ if (msgCount > maxMsg) {
+ synchronized (outputStream) {
+ outputStream.reset();
+ }
+ msgCount = 0;
+ } else
+ msgCount++;
+ } catch (IOException e) {
+ // Log to stderr
+ Debug.errDumpStack(e, "Exception in sender thread for "
+ + address + ":" + port);
+
+ if (isClosing) {
+ isClosing = false;
+ synchronized (Client.this) {
+ Client.this.notifyAll();
+ }
+ } else {
+ if (handler.handleSuspectEvent(new ConnectionEvent(
+ Client.this, e))) {
+ handler
+ .handleDisconnectEvent(new DisconnectConnectionEvent(
+ Client.this, e, queue));
+ }
+ }
+ break;
+ }
+ }
+ if (Debug.ON && debug != null) {
+ debug.msg("Sndr for " + address + ":" + port
+ + " terminating.");
+ }
+ }
+ }, "Sndr for " + address + ":" + port);
+ // Set priority for new thread
+ aThread.setPriority(SNDR_PRIORITY);
+ return aThread;
+ }
+
+ private void closeSocket() {
+ // Close socket
+ try {
+ if (socket != null)
+ socket.close();
+ } catch (IOException e) {
+ }
+ }
+
+ private void sendIt(Serializable snd) throws IOException {
+ // Write object to output stream
+ synchronized (outputStream) {
+ outputStream.writeObject(snd);
+ outputStream.flush();
+ nextPingTime = System.currentTimeMillis() + keepAlive;
+ }
+ }
+
+ private void receiveResp() {
+ synchronized (outputStream) {
+ waitForPing = false;
+ nextPingTime = System.currentTimeMillis() + keepAlive;
+ outputStream.notifyAll();
+ }
+ }
+
+ public void setCloseTimeout(long t) {
+ closeTimeout = t;
+ }
+
+ private void sendClose(Serializable snd) throws IOException {
+ isClosing = true;
+ sendIt(snd);
+ if (isClosing) {
+ try {
+ wait(closeTimeout);
+ } catch (Exception e) {
+ }
+ }
+ // Before returning, actually remove remote objects
+ handler.handleDisconnectEvent(new DisconnectConnectionEvent(
+ Client.this, null, queue));
+ }
+
+ private Thread getRcvThread() {
+ Thread aThread = new Thread(new Runnable() {
+ public void run() {
+ Thread me = Thread.currentThread();
+ // Loop forever and handle objects received.
+ for (;;) {
+ if (me.isInterrupted())
+ break;
+ try {
+ handleRcv(readObject());
+ } catch (IOException e) {
+ // Log to stderr
+ Debug.errDumpStack(e, "Exception in read thread for "
+ + address + ":" + port);
+
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(e, "Exception in read thread for "
+ + address + ":" + port + ": "
+ + e.getMessage());
+ }
+ if (isClosing) {
+ isClosing = false;
+ synchronized (Client.this) {
+ Client.this.notifyAll();
+ }
+ } else {
+ if (handler.handleSuspectEvent(new ConnectionEvent(
+ Client.this, e))) {
+ handler
+ .handleDisconnectEvent(new DisconnectConnectionEvent(
+ Client.this, e, queue));
+ }
+ }
+ break;
+ }
+ }
+ if (Debug.ON && debug != null) {
+ debug.msg("Rcvr for " + address + ":" + port
+ + " terminating.");
+ }
+ }
+ }, "Rcvr for " + address + ":" + port);
+ // Set priority and return
+ aThread.setPriority(RCVR_PRIORITY);
+ return aThread;
+ }
+
+ private void handleRcv(Serializable rcv) throws IOException {
+ try {
+ // We've received some data, so the connection is alive
+ receiveResp();
+ // Handle all messages
+ if (rcv instanceof SynchMessage) {
+ // Handle synch message. The only valid synch message is
+ // 'close'.
+ handler.handleSynchEvent(new SynchConnectionEvent(this,
+ ((SynchMessage) rcv).getData()));
+ } else if (rcv instanceof AsynchMessage) {
+ Serializable d = ((AsynchMessage) rcv).getData();
+ // Handle asynch messages.
+ handler.handleAsynchEvent(new AsynchConnectionEvent(this, d));
+ } else if (rcv instanceof PingMessage) {
+ // Handle ping by sending response back immediately
+ sendIt(pingResp);
+ } else if (rcv instanceof PingResponseMessage) {
+ // Do nothing with ping response
+ } else
+ throw new IOException("Invalid packet received.");
+ } catch (IOException e) {
+ disconnect();
+ throw e;
+ }
+ }
+
+ public synchronized void start() {
+ if (sendThread != null)
+ sendThread.start();
+ if (rcvThread != null)
+ rcvThread.start();
+ // Setup and start keep alive thread
+ if (keepAlive != 0)
+ keepAliveThread = setupPing();
+ if (keepAliveThread != null)
+ keepAliveThread.start();
+ }
+ public void stop() {
+ }
+ private Thread setupPing() {
+ return new Thread(new Runnable() {
+ public void run() {
+ Thread me = Thread.currentThread();
+ while (!queue.isStopped()) {
+ try {
+ if (me.isInterrupted())
+ break;
+ // Sleep for timeout interval divided by two
+ Thread.sleep(keepAlive / 2);
+ if (me.isInterrupted())
+ break;
+ // Check to see how long it has been since our last
+ // send.
+ synchronized (outputStream) {
+ if (System.currentTimeMillis() >= nextPingTime) {
+ // If it's been longer than our timeout
+ // interval, then ping
+ waitForPing = true;
+ // Actually send ping instance
+ sendIt(ping);
+ if (waitForPing) {
+ try {
+ // Wait for keepAliveInterval for
+ // pingresp
+ outputStream.wait(keepAlive / 2);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (waitForPing) {
+ throw new IOException(address + ":" + port
+ + " not reachable.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ // Log to stderr
+ Debug.errDumpStack(e, "Exception in ping thread for "
+ + address + ":" + port);
+
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(e, "Exception in ping.");
+ }
+ if (isClosing) {
+ isClosing = false;
+ synchronized (Client.this) {
+ Client.this.notifyAll();
+ }
+ } else {
+ if (handler.handleSuspectEvent(new ConnectionEvent(
+ Client.this, e))) {
+ handler
+ .handleDisconnectEvent(new DisconnectConnectionEvent(
+ Client.this, e, queue));
+ }
+ }
+ break;
+ }
+ }
+ if (Debug.ON && debug != null) {
+ debug.msg("Keepalive terminating.");
+ }
+ }
+ }, "Keepalive " + address + ":" + port);
+ }
+
+ public synchronized void disconnect() throws IOException {
+ if (Debug.ON && debug != null) {
+ debug.msg("disconnect()");
+ }
+ // Close send queue and socket
+ queue.close();
+ closeSocket();
+ // Notify sender in case it's waiting for a response
+ // Zap keep alive thread
+ if (keepAliveThread != null) {
+ keepAliveThread.interrupt();
+ keepAliveThread = null;
+ }
+ if (sendThread != null) {
+ sendThread.interrupt();
+ sendThread = null;
+ }
+ if (rcvThread != null) {
+ rcvThread.interrupt();
+ rcvThread = null;
+ }
+ // Notify any threads waiting to get hold of our lock
+ notifyAll();
+ }
+
+ public void sendAsynch(ID recipient, byte[] obj) throws IOException {
+ queueObject(recipient, obj);
+ }
+ public void sendAsynch(ID recipien, Object obj) throws IOException {
+ queueObject(recipien, (Serializable) obj);
+ }
+ public synchronized void queueObject(ID recipient, Serializable obj)
+ throws IOException {
+ if (queue.isStopped() || isClosing)
+ throw new ConnectException("Not connected");
+ queue.enqueue(new AsynchMessage(obj));
+ }
+ public synchronized Serializable sendObject(ID recipient, Serializable obj)
+ throws IOException {
+ if (queue.isStopped() || isClosing)
+ throw new ConnectException("Not connected");
+ sendClose(new SynchMessage(obj));
+ return null;
+ }
+
+ public Object sendSynch(ID rec, Object obj) throws IOException {
+ return sendObject(rec, (Serializable) obj);
+ }
+
+ public Object sendSynch(ID rec, byte[] obj) throws IOException {
+ return sendObject(rec, obj);
+ }
+ private Serializable readObject() throws IOException {
+ Serializable ret = null;
+ try {
+ ret = (Serializable) inputStream.readObject();
+ } catch (ClassNotFoundException e) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(e, "Class not found exception");
+ }
+ throw new IOException(
+ "Protocol violation due to class load failure. "
+ + e.getMessage());
+ }
+ return ret;
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java
new file mode 100644
index 000000000..4b3b548a1
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java
@@ -0,0 +1,27 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.Serializable;
+import java.net.URI;
+
+public class ConnectRequestMessage implements Serializable {
+
+ URI target;
+ Serializable data;
+
+ public ConnectRequestMessage(URI target, Serializable data) {
+ this.target = target;
+ this.data = data;
+ }
+
+ public URI getTarget() {
+ return target;
+ }
+
+ public Serializable getData() {
+ return data;
+ }
+
+ public String toString() {
+ return "ConnectRequestMessage[target:" + target + ";data:" + data + "]";
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java
new file mode 100644
index 000000000..56ee36524
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java
@@ -0,0 +1,16 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.Serializable;
+
+public class ConnectResultMessage implements Serializable {
+
+ Serializable data;
+
+ public ConnectResultMessage(Serializable data) {
+ this.data = data;
+ }
+
+ public Serializable getData() {
+ return data;
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java
new file mode 100644
index 000000000..4f6853a59
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java
@@ -0,0 +1,54 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+
+import org.eclipse.ecf.provider.Debug;
+
+public class ExObjectInputStream extends ObjectInputStream {
+
+ private boolean replace = false;
+
+ public static final Debug debug = Debug.create(ExObjectInputStream.class
+ .getName());
+
+ public ExObjectInputStream(InputStream in) throws IOException,
+ SecurityException {
+ super(in);
+ if (Debug.ON && debug != null) {
+ debug.msg("ExObjectInputStream(" + in + ")");
+ }
+ }
+
+ public ExObjectInputStream(InputStream in, boolean backwardCompatibility)
+ throws IOException, SecurityException {
+ super(in);
+ if (Debug.ON && debug != null) {
+ debug.msg("ExObjectInputStream(" + in + "," + backwardCompatibility
+ + ")");
+ }
+ if (backwardCompatibility) {
+ try {
+ super.enableResolveObject(true);
+ replace = true;
+ debug("ExObjectInputStream.compatibility set");
+ } catch (Exception e) {
+ throw new IOException(
+ "Could not setup backward compatibility object replacers for ExObjectInputStream");
+ }
+ }
+ }
+
+ protected void debug(String msg) {
+ if (Debug.ON && debug != null) {
+ debug.msg(msg);
+ }
+ }
+ protected void debug(String msg, Throwable t) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(t, msg);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java
new file mode 100644
index 000000000..ffb02010c
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java
@@ -0,0 +1,45 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+import org.eclipse.ecf.provider.Debug;
+
+public class ExObjectOutputStream extends ObjectOutputStream {
+
+ private boolean replace = false;
+
+ public static final Debug debug = Debug.create(ExObjectOutputStream.class
+ .getName());
+
+ public ExObjectOutputStream(OutputStream out) throws IOException {
+ super(out);
+ debug("ExObjectOutputStream(" + out + ")");
+ }
+
+ public ExObjectOutputStream(OutputStream out, boolean backwardCompatibility)
+ throws IOException, SecurityException {
+ this(out);
+ if (backwardCompatibility) {
+ try {
+ super.enableReplaceObject(true);
+ replace = true;
+ debug("ExObjectOutputStream.compatibility set");
+ } catch (Exception e) {
+ throw new IOException(
+ "Could not setup backward compatibility object replacers for ExObjectOutputStream");
+ }
+ }
+ }
+ protected void debug(String msg) {
+ if (Debug.ON && debug != null) {
+ debug.msg(msg);
+ }
+ }
+ protected void debug(String msg, Throwable t) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(t, msg);
+ }
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java
new file mode 100644
index 000000000..46e775ee7
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java
@@ -0,0 +1,8 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+public interface IClientSocketFactory {
+ Socket createSocket(String name, int port, int timeout) throws IOException;
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java
new file mode 100644
index 000000000..ab6a443f8
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java
@@ -0,0 +1,9 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+public interface IServerSocketFactory {
+ ServerSocket createServerSocket(int port, int backlog) throws IOException;
+}
+
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java
new file mode 100644
index 000000000..74a5f4eea
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java
@@ -0,0 +1,7 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.net.Socket;
+
+public interface ISocketAcceptHandler {
+ public void handleAccept(Socket aSocket) throws Exception;
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java
new file mode 100644
index 000000000..f17b7ebd2
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java
@@ -0,0 +1,8 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.Serializable;
+
+public class PingMessage implements Serializable {
+ protected PingMessage() {
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java
new file mode 100644
index 000000000..d262b1fb3
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java
@@ -0,0 +1,8 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.Serializable;
+
+public class PingResponseMessage implements Serializable {
+ protected PingResponseMessage() {
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java
new file mode 100644
index 000000000..694cfabcb
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java
@@ -0,0 +1,93 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.eclipse.ecf.provider.Debug;
+
+public class Server extends ServerSocket {
+ public static Debug debug = Debug.create(Server.class.getName());
+
+ ISocketAcceptHandler acceptHandler;
+
+ Thread listenerThread;
+ ThreadGroup threadGroup;
+
+ public Server(ThreadGroup group, int port, ISocketAcceptHandler handler)
+ throws IOException {
+ super(port);
+ if (handler == null)
+ throw new InstantiationError("Listener cannot be null");
+ acceptHandler = handler;
+ threadGroup = group;
+ listenerThread = setupListener();
+ listenerThread.start();
+ }
+
+ public Server(int port, ISocketAcceptHandler handler) throws IOException {
+ this(null, port, handler);
+ }
+
+ protected Thread setupListener() {
+ return new Thread(threadGroup, new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ handleAccept(accept());
+ } catch (Exception e) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(e, "Exception in accept");
+ }
+ // If we get an exception on accept(), we should just
+ // exit
+ break;
+ }
+ }
+ if (Debug.ON && debug != null) {
+ debug.msg("Closing listener normally.");
+ }
+ }
+ }, "Server(" + getLocalPort() + ")");
+ }
+
+ protected void handleAccept(final Socket aSocket) {
+ new Thread(threadGroup, new Runnable() {
+ public void run() {
+ try {
+ acceptHandler.handleAccept(aSocket);
+ } catch (Exception e) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(e,
+ "Unexplained exception in connect. Closing.");
+ }
+ try {
+ aSocket.close();
+ } catch (IOException e1) {
+ }
+ ;
+ } finally {
+ if (Debug.ON && debug != null) {
+ debug.msg("handleAcceptAsych terminating.");
+ }
+ }
+ }
+ }).start();
+ }
+
+ public synchronized void close() throws IOException {
+ if (Debug.ON && debug != null) {
+ debug.msg("close()");
+ }
+ super.close();
+ if (listenerThread != null) {
+ listenerThread.interrupt();
+ listenerThread = null;
+ }
+ if (threadGroup != null) {
+ threadGroup.interrupt();
+ threadGroup = null;
+ }
+ acceptHandler = null;
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java
new file mode 100644
index 000000000..6e7f437ce
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java
@@ -0,0 +1,50 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class SocketFactory implements IClientSocketFactory,
+ IServerSocketFactory {
+
+ protected static SocketFactory defaultFactory;
+ protected static SocketFactory factory = null;
+
+ public Socket createSocket(String name, int port, int timeout)
+ throws IOException {
+ if (factory != null) {
+ return factory.createSocket(name, port, timeout);
+ } else {
+ Socket s = new Socket();
+ s.connect(new InetSocketAddress(name, port), timeout);
+ return s;
+ }
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog)
+ throws IOException {
+ if (factory != null) {
+ return factory.createServerSocket(port, backlog);
+ } else
+ return new ServerSocket(port, backlog);
+ }
+
+ public static synchronized SocketFactory getSocketFactory() {
+ return factory;
+ }
+
+ public synchronized static SocketFactory getDefaultSocketFactory() {
+ if (defaultFactory == null) {
+ defaultFactory = new SocketFactory();
+ }
+ return defaultFactory;
+ }
+
+ public synchronized static void setSocketFactory(SocketFactory fact) {
+ if (!fact.equals(defaultFactory)) {
+ factory = fact;
+ }
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java
new file mode 100644
index 000000000..e7c6d8872
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java
@@ -0,0 +1,13 @@
+package org.eclipse.ecf.provider.comm.tcp;
+
+import java.io.Serializable;
+
+public class SynchMessage extends AsynchMessage {
+
+ protected SynchMessage(Serializable data) {
+ super(data);
+ }
+ protected SynchMessage() {
+ super();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java
new file mode 100644
index 000000000..679f5d375
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java
@@ -0,0 +1,288 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.Serializable;
+import java.net.ConnectException;
+
+import org.eclipse.ecf.core.ISharedObjectContainerConfig;
+import org.eclipse.ecf.core.SharedObjectContainerJoinException;
+import org.eclipse.ecf.core.SharedObjectDescription;
+import org.eclipse.ecf.core.comm.AsynchConnectionEvent;
+import org.eclipse.ecf.core.comm.ConnectionInstantiationException;
+import org.eclipse.ecf.core.comm.DisconnectConnectionEvent;
+import org.eclipse.ecf.core.comm.IAsynchConnection;
+import org.eclipse.ecf.core.comm.IConnection;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.core.comm.SynchConnectionEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.provider.generic.gmm.Member;
+
+public abstract class ClientSOContainer extends SOContainer {
+ ISynchAsynchConnection connection;
+ ID remoteServerID;
+ byte connectionState;
+ public static final byte UNCONNECTED = 0;
+ public static final byte CONNECTING = 1;
+ public static final byte CONNECTED = 2;
+ static final class Lock {
+ }
+ Lock connectLock;
+
+ public ClientSOContainer(ISharedObjectContainerConfig config) {
+ super(config);
+ connection = null;
+ connectionState = UNCONNECTED;
+ connectLock = new Lock();
+ }
+ public final boolean isGroupServer() {
+ return false;
+ }
+ public final boolean isGroupManager() {
+ return false;
+ }
+ public ID getGroupID() {
+ return remoteServerID;
+ }
+ public void joinGroup(ID remoteSpace, Object data)
+ throws SharedObjectContainerJoinException {
+ try {
+ if (isClosing)
+ throw new IllegalStateException("container is closing");
+ ISynchAsynchConnection aConnection = getClientConnection(
+ remoteSpace, data);
+ if (aConnection == null) {
+ ConnectException c = new ConnectException("join failed to"
+ + ":" + remoteSpace.getName());
+ throw c;
+ }
+ ContainerMessage response;
+ synchronized (connectLock) {
+ if (isConnected()) {
+ killConnection(aConnection);
+ aConnection = null;
+ ConnectException c = new ConnectException(
+ "already connected to " + getGroupID());
+ throw c;
+ }
+ if (isConnecting()) {
+ killConnection(aConnection);
+ aConnection = null;
+ ConnectException c = new ConnectException(
+ "currently connecting");
+ throw c;
+ }
+ connectionState = CONNECTING;
+ connection = aConnection;
+ }
+ synchronized (aConnection) {
+ try {
+ Object connectData = getConnectData(remoteSpace, data);
+ response = (ContainerMessage) aConnection.connect(
+ remoteSpace, connectData, 0);
+ } catch (IOException e) {
+ synchronized (connectLock) {
+ killConnection(aConnection);
+ if (connection != aConnection) {
+ aConnection = null;
+ throw e;
+ }
+ connectionState = UNCONNECTED;
+ connection = null;
+ remoteServerID = null;
+ }
+ throw e;
+ }
+ synchronized (connectLock) {
+ // If not in correct state, disconnect and return
+ if (connection != aConnection) {
+ killConnection(aConnection);
+ aConnection = null;
+ ConnectException c = new ConnectException(
+ "join failed because not in correct state");
+ throw c;
+ }
+ ID serverID = null;
+ try {
+ serverID = acceptNewServer(response);
+ } catch (Exception e) {
+ killConnection(aConnection);
+ aConnection = null;
+ connection = null;
+ remoteServerID = null;
+ connectionState = UNCONNECTED;
+ ConnectException c = new ConnectException(
+ "join refused locally via acceptNewServer");
+ throw c;
+ }
+ aConnection.start();
+ remoteServerID = serverID;
+ connectionState = CONNECTED;
+ }
+ }
+ } catch (Exception e) {
+ throw new SharedObjectContainerJoinException("could not join", e);
+ }
+ }
+
+ protected void forwardExcluding(ID from, ID excluding, ContainerMessage data)
+ throws IOException {
+ // NOP
+ }
+ protected Serializable getConnectData(ID target, Object data) {
+ return ContainerMessage.makeJoinGroupMessage(getID(), target,
+ getNextSequenceNumber(), (Serializable) data);
+ }
+ protected Serializable getLeaveData(ID target) {
+ return null;
+ }
+ public void leaveGroup() {
+ synchronized (connectLock) {
+ // If we are currently connected
+ if (isConnected()) {
+ synchronized (connection) {
+ try {
+ connection.sendSynch(remoteServerID,
+ getBytesForObject(ContainerMessage
+ .makeLeaveGroupMessage(getID(),
+ remoteServerID,
+ getNextSequenceNumber(),
+ getLeaveData(remoteServerID))));
+ } catch (Exception e) {
+ }
+ synchronized (getGroupMembershipLock()) {
+ memberLeave(remoteServerID, connection);
+ }
+ }
+ }
+ connectionState = UNCONNECTED;
+ connection = null;
+ remoteServerID = null;
+ }
+ }
+
+ protected abstract ISynchAsynchConnection getClientConnection(
+ ID remoteSpace, Object data) throws ConnectionInstantiationException;
+
+ protected void handleChangeMsg(ID fromID, ID toID, long seqNum,
+ Serializable data) throws IOException {
+ ContainerMessage.ViewChangeMessage c = null;
+ // Check data in packge for validity
+ ID ids[] = null;
+ try {
+ c = (ContainerMessage.ViewChangeMessage) data;
+ if (fromID == null || c == null)
+ throw new Exception();
+ ids = c.changeIDs;
+ if (ids == null || ids[0] == null || !fromID.equals(remoteServerID))
+ throw new IOException();
+ } catch (Exception e) {
+ InvalidObjectException t = new InvalidObjectException("bad data"
+ + ":" + fromID + ":" + toID + ":" + seqNum);
+ throw t;
+ }
+ // Now actually add/remove member
+ Member m = new Member(ids[0]);
+ synchronized (getGroupMembershipLock()) {
+ if (c.add) {
+ groupManager.addMember(m);
+ } else
+ groupManager.removeMember(m);
+ }
+ }
+
+ protected void queueContainerMessage(ContainerMessage message)
+ throws IOException {
+ // Do it
+ connection.sendAsynch(message.getToContainerID(),
+ getBytesForObject(message));
+ }
+ protected void forwardExcluding(ID from, ID excluding, byte msg,
+ Serializable data) throws IOException { /* NOP */
+ }
+ protected void forwardToRemote(ID from, ID to, ContainerMessage message)
+ throws IOException { /* NOP */
+ }
+ protected ID getIDForConnection(IAsynchConnection conn) {
+ return remoteServerID;
+ }
+ protected void memberLeave(ID fromID, IAsynchConnection conn) {
+ if (fromID.equals(remoteServerID)) {
+ groupManager.removeAllMembers();
+ super.memberLeave(fromID, conn);
+ connectionState = UNCONNECTED;
+ connection = null;
+ remoteServerID = null;
+ } else if (fromID.equals(getID())) {
+ super.memberLeave(fromID, conn);
+ }
+ }
+ protected void sendMessage(ContainerMessage data) throws IOException {
+ // Get connect lock, then call super version
+ synchronized (connectLock) {
+ checkConnected();
+ super.sendMessage(data);
+ }
+ }
+ protected ID[] sendCreateMsg(ID toID, SharedObjectDescription createInfo)
+ throws IOException {
+ // Get connect lock, then call super version
+ synchronized (connectLock) {
+ checkConnected();
+ return super.sendCreateSharedObjectMessage(toID, createInfo);
+ }
+ }
+ protected void processDisconnect(DisconnectConnectionEvent evt) {
+ // Get connect lock, and just return if this connection has been
+ // terminated
+ synchronized (connectLock) {
+ super.processDisconnect(evt);
+ }
+ }
+ protected void processAsynchPacket(AsynchConnectionEvent evt)
+ throws IOException {
+ // Get connect lock, then call super version
+ synchronized (connectLock) {
+ checkConnected();
+ super.processAsynch(evt);
+ }
+ }
+ protected Serializable processSynch(SynchConnectionEvent evt)
+ throws IOException {
+ synchronized (connectLock) {
+ checkConnected();
+ IConnection conn = evt.getConnection();
+ if (connection != conn)
+ throw new ConnectException("not connected");
+ return super.processSynch(evt);
+ }
+ }
+
+ protected boolean isConnected() {
+ return (connectionState == CONNECTED);
+ }
+ protected boolean isConnecting() {
+ return (connectionState == CONNECTING);
+ }
+ private void checkConnected() throws ConnectException {
+ if (!isConnected())
+ throw new ConnectException("not connected");
+ }
+ protected ID acceptNewServer(ContainerMessage serverData) throws Exception {
+ ContainerMessage aPacket = serverData;
+ ID fromID = aPacket.getFromContainerID();
+
+ if (fromID == null)
+ throw new InvalidObjectException("server id is null");
+
+ ID[] ids = ((ContainerMessage.ViewChangeMessage) aPacket.getData()).changeIDs;
+ if (ids == null)
+ throw new java.io.InvalidObjectException("id array null");
+ for (int i = 0; i < ids.length; i++) {
+ ID id = ids[i];
+ if (id != null && !id.equals(getID()))
+ addNewRemoteMember(id, null);
+ }
+ return fromID;
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java
new file mode 100644
index 000000000..6759a236e
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java
@@ -0,0 +1,32 @@
+package org.eclipse.ecf.provider.generic;
+
+import org.eclipse.ecf.core.ISharedObjectContainer;
+import org.eclipse.ecf.core.SharedObjectContainerInstantiationException;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.provider.ISharedObjectContainerInstantiator;
+
+public class ContainerInstantiator implements ISharedObjectContainerInstantiator {
+
+ public ContainerInstantiator() {
+ super();
+ }
+
+ public ISharedObjectContainer makeInstance(Class[] argTypes, Object[] args)
+ throws SharedObjectContainerInstantiationException {
+ try {
+ Boolean isClient = null;
+ ID id = null;
+ isClient = (Boolean) args[0];
+ id = (ID) args[1];
+ ISharedObjectContainer result = null;
+ if (isClient.booleanValue()) {
+ return new TCPClientSOContainer(new SOContainerConfig(id));
+ } else {
+ return new TCPServerSOContainer(new SOContainerConfig(id));
+ }
+ } catch (Exception e) {
+ throw new SharedObjectContainerInstantiationException("Exception creating container",e);
+ }
+ }
+
+}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java
new file mode 100644
index 000000000..3728b67e0
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java
@@ -0,0 +1,166 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.io.Serializable;
+import org.eclipse.ecf.core.identity.ID;
+
+public class ContainerMessage implements Serializable {
+
+ ID fromContainerID;
+ public ID toContainerID;
+ long sequence;
+ Serializable data;
+
+ /**
+ * @return Returns the data.
+ */
+ public Serializable getData() {
+ return data;
+ }
+ /**
+ * @param data
+ * The data to set.
+ */
+ public void setData(Serializable data) {
+ this.data = data;
+ }
+ /**
+ * @return Returns the fromContainerID.
+ */
+ public ID getFromContainerID() {
+ return fromContainerID;
+ }
+ /**
+ * @param fromContainerID
+ * The fromContainerID to set.
+ */
+ public void setFromContainerID(ID fromContainerID) {
+ this.fromContainerID = fromContainerID;
+ }
+ /**
+ * @return Returns the sequence.
+ */
+ public long getSequence() {
+ return sequence;
+ }
+ /**
+ * @param sequence
+ * The sequence to set.
+ */
+ public void setSequence(long sequence) {
+ this.sequence = sequence;
+ }
+ /**
+ * @return Returns the toContainerID.
+ */
+ public ID getToContainerID() {
+ return toContainerID;
+ }
+ /**
+ * @param toContainerID
+ * The toContainerID to set.
+ */
+ public void setToContainerID(ID toContainerID) {
+ this.toContainerID = toContainerID;
+ }
+ static ContainerMessage makeViewChangeMessage(ID from, ID to, long seq,
+ ID ids[], boolean add, Serializable data) {
+ return new ContainerMessage(from, to, seq, new ViewChangeMessage(ids,
+ add, data));
+ }
+ static ContainerMessage makeJoinGroupMessage(ID from, ID to, long seq,
+ Serializable data) {
+ return new ContainerMessage(from, to, seq, new JoinGroupMessage(data));
+ }
+ static ContainerMessage makeLeaveGroupMessage(ID from, ID to, long seq,
+ Serializable data) {
+ return new ContainerMessage(from, to, seq, new LeaveGroupMessage(data));
+ }
+ static ContainerMessage makeSharedObjectCreateMessage(ID from, ID to,
+ long seq, Serializable data) {
+ return new ContainerMessage(from, to, seq, new CreateMessage(data));
+ }
+ static ContainerMessage makeSharedObjectCreateResponseMessage(ID from,
+ ID to, long contSeq, ID soID, Throwable e, long sequence) {
+ return new ContainerMessage(from, to, contSeq,
+ new CreateResponseMessage(soID, e, sequence));
+ }
+ static ContainerMessage makeSharedObjectMessage(ID from, ID to, long seq,
+ ID fromSharedObject, Serializable data) {
+ return new ContainerMessage(from, to, seq, new SharedObjectMessage(
+ fromSharedObject, data));
+ }
+ static ContainerMessage makeSharedObjectDisposeMessage(ID from, ID to,
+ long seq, ID sharedObjectID) {
+ return new ContainerMessage(from, to, seq,
+ new SharedObjectDisposeMessage(sharedObjectID));
+ }
+ protected ContainerMessage(ID from, ID to, long seq, Serializable data) {
+ this.fromContainerID = from;
+ this.toContainerID = to;
+ this.sequence = seq;
+ this.data = data;
+ }
+
+ public static final class ViewChangeMessage implements Serializable {
+ ID changeIDs[];
+ boolean add;
+ Serializable data;
+ ViewChangeMessage(ID id[], boolean a, Serializable data) {
+ this.changeIDs = id;
+ this.add = a;
+ this.data = data;
+ }
+ }
+ public static final class CreateMessage implements Serializable {
+ Serializable data;
+ CreateMessage(Serializable data) {
+ this.data = data;
+ }
+ }
+ public static final class CreateResponseMessage implements Serializable {
+ ID sharedObjectID;
+ Throwable exception;
+ long sequence;
+ public CreateResponseMessage(ID objID, Throwable except, long sequence) {
+ this.sharedObjectID = objID;
+ this.exception = except;
+ this.sequence = sequence;
+ }
+ }
+ public static final class SharedObjectMessage implements Serializable {
+ Serializable data;
+ ID fromSharedObjectID;
+ SharedObjectMessage(ID fromSharedObject, Serializable data) {
+ this.fromSharedObjectID = fromSharedObject;
+ this.data = data;
+ }
+ }
+ public static final class SharedObjectDisposeMessage implements
+ Serializable {
+ ID sharedObjectID;
+ SharedObjectDisposeMessage(ID objID) {
+ this.sharedObjectID = objID;
+ }
+ }
+
+ public static final class JoinGroupMessage implements Serializable {
+ Serializable data;
+
+ public JoinGroupMessage(Serializable data) {
+ this.data = data;
+ }
+ public Serializable getData() {
+ return data;
+ }
+ }
+ public static final class LeaveGroupMessage implements Serializable {
+ Serializable data;
+
+ public LeaveGroupMessage(Serializable data) {
+ this.data = data;
+ }
+ public Serializable getData() {
+ return data;
+ }
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java
new file mode 100644
index 000000000..8dceedd7c
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Created on Dec 6, 2004
+ *
+ */
+package org.eclipse.ecf.provider.generic;
+
+import org.eclipse.ecf.core.util.EnqueuePredicate;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.core.util.QueueEnqueue;
+import org.eclipse.ecf.core.util.QueueException;
+import org.eclipse.ecf.core.util.SimpleQueueImpl;
+
+public class QueueEnqueueImpl implements QueueEnqueue {
+
+ SimpleQueueImpl queue = null;
+
+ public QueueEnqueueImpl(SimpleQueueImpl impl) {
+ super();
+ this.queue = impl;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue(org.eclipse.ecf.core.util.Event)
+ */
+ public void enqueue(Event element) throws QueueException {
+ queue.enqueue(element);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue(org.eclipse.ecf.core.util.Event[])
+ */
+ public void enqueue(Event[] elements) throws QueueException {
+ if (elements != null) {
+ for (int i = 0; i < elements.length; i++) {
+ enqueue(elements[i]);
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_prepare(org.eclipse.ecf.core.util.Event[])
+ */
+ public Object enqueue_prepare(Event[] elements) throws QueueException {
+ return elements;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_commit(java.lang.Object)
+ */
+ public void enqueue_commit(Object enqueue_key) {
+ if (enqueue_key instanceof Event[]) {
+ Event[] events = (Event[]) enqueue_key;
+ try {
+ enqueue(events);
+ } catch (QueueException e) {
+ // this should not happen
+ e.printStackTrace(System.err);
+ }
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_abort(java.lang.Object)
+ */
+ public void enqueue_abort(Object enqueue_key) {
+ // Do nothing
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_lossy(org.eclipse.ecf.core.util.Event)
+ */
+ public boolean enqueue_lossy(Event element) {
+ queue.enqueue(element);
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#setEnqueuePredicate(org.eclipse.ecf.core.util.EnqueuePredicate)
+ */
+ public void setEnqueuePredicate(EnqueuePredicate pred) {
+ // This queue does not support enqueue predicate
+ // So we do nothing
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#getEnqueuePredicate()
+ */
+ public EnqueuePredicate getEnqueuePredicate() {
+ // We don't support enqueue predicate, so return null;
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.util.QueueEnqueue#size()
+ */
+ public int size() {
+ return queue.size();
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java
new file mode 100644
index 000000000..a4b48b7cf
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Created on Nov 29, 2004
+ *
+ */
+package org.eclipse.ecf.provider.generic;
+
+import java.util.Map;
+
+import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.ISharedObjectContext;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.QueueEnqueue;
+
+public class SOConfig implements ISharedObjectConfig {
+
+ SOContainer container = null;
+ ID sharedObjectID;
+ ID homeContainerID;
+ boolean isActive;
+ Map properties;
+ SOContext standAloneContext;
+
+ public SOConfig(ID sharedObjectID, ID homeContainerID, SOContainer cont,
+ Map dict) {
+ super();
+ this.sharedObjectID = sharedObjectID;
+ this.homeContainerID = homeContainerID;
+ isActive = false;
+ properties = dict;
+ this.container = cont;
+ }
+
+ protected void makeActive(QueueEnqueue queue) {
+ isActive = true;
+ this.standAloneContext = new SOContext(sharedObjectID, homeContainerID,
+ container, properties, queue);
+ }
+
+ protected void makeInactive() {
+ this.standAloneContext.makeInactive();
+ this.standAloneContext = null;
+ isActive = false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConfig#getSharedObjectID()
+ */
+ public ID getSharedObjectID() {
+ return sharedObjectID;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConfig#getHomeContainerID()
+ */
+ public ID getHomeContainerID() {
+ return homeContainerID;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConfig#getContext()
+ */
+ public ISharedObjectContext getContext() {
+ if (isActive) {
+ return null;
+ } else
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConfig#getProperties()
+ */
+ public Map getProperties() {
+ return properties;
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java
new file mode 100644
index 000000000..c5d78bf8f
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java
@@ -0,0 +1,116 @@
+/*
+ * Created on Dec 20, 2004
+ *
+ */
+package org.eclipse.ecf.provider.generic;
+
+import java.util.Enumeration;
+import java.util.Hashtable;
+
+import org.eclipse.ecf.core.ISharedObjectConnector;
+import org.eclipse.ecf.core.events.ISharedObjectEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.AsynchResult;
+import org.eclipse.ecf.core.util.QueueEnqueue;
+import org.eclipse.ecf.core.util.QueueException;
+import org.eclipse.ecf.provider.generic.events.SharedObjectCallEvent;
+
+public class SOConnector implements ISharedObjectConnector {
+
+ ID sender;
+ Hashtable receiverQueues = null;
+
+ public SOConnector(ID sender, ID[] recv, QueueEnqueue[] queues) {
+ super();
+ this.receiverQueues = new Hashtable();
+ for (int i = 0; i < recv.length; i++) {
+ receiverQueues.put(recv[i], queues[i]);
+ }
+ }
+
+ protected void fireEvent(ISharedObjectEvent event) throws QueueException {
+ for (Enumeration e = receiverQueues.elements(); e.hasMoreElements();) {
+ QueueEnqueue queue = (QueueEnqueue) e.nextElement();
+ queue.enqueue(event);
+ }
+ }
+ protected void fireEvents(ISharedObjectEvent[] event) throws QueueException {
+ for (Enumeration e = receiverQueues.elements(); e.hasMoreElements();) {
+ QueueEnqueue queue = (QueueEnqueue) e.nextElement();
+ if (queue != null) {
+ queue.enqueue(event);
+ }
+ }
+ }
+ protected AsynchResult[] fireCallEvent(ISharedObjectEvent event)
+ throws QueueException {
+ AsynchResult[] results = new AsynchResult[receiverQueues.size()];
+ int i = 0;
+ for (Enumeration e = receiverQueues.elements(); e.hasMoreElements();) {
+ QueueEnqueue queue = (QueueEnqueue) e.nextElement();
+ results[i] = new AsynchResult();
+ queue.enqueue(new SharedObjectCallEvent(event
+ .getSenderSharedObjectID(), event, results[i]));
+ }
+ return results;
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConnector#getSender()
+ */
+ public ID getSender() {
+ return sender;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConnector#getReceivers()
+ */
+ public ID[] getReceivers() {
+ return (ID[]) receiverQueues.keySet().toArray(
+ new ID[receiverQueues.size()]);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConnector#enqueue(org.eclipse.ecf.core.events.ISharedObjectEvent)
+ */
+ public void enqueue(ISharedObjectEvent event) throws QueueException {
+ fireEvent(event);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConnector#enqueue(org.eclipse.ecf.core.events.ISharedObjectEvent[])
+ */
+ public void enqueue(ISharedObjectEvent[] events) throws QueueException {
+ fireEvents(events);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConnector#callAsynch(org.eclipse.ecf.core.events.ISharedObjectEvent)
+ */
+ public AsynchResult[] callAsynch(ISharedObjectEvent arg) throws Exception {
+ return fireCallEvent(arg);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectConnector#dispose()
+ */
+ public void dispose() {
+ if (receiverQueues != null) {
+ receiverQueues.clear();
+ receiverQueues = null;
+ }
+ sender = null;
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
new file mode 100644
index 000000000..fe39e8938
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
@@ -0,0 +1,675 @@
+/*
+ * Created on Dec 16, 2004
+ *
+ */
+package org.eclipse.ecf.provider.generic;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Vector;
+
+import org.eclipse.ecf.core.IOSGIService;
+import org.eclipse.ecf.core.ISharedObject;
+import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.ISharedObjectContainer;
+import org.eclipse.ecf.core.ISharedObjectContainerConfig;
+import org.eclipse.ecf.core.ISharedObjectContainerListener;
+import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
+import org.eclipse.ecf.core.ISharedObjectManager;
+import org.eclipse.ecf.core.SharedObjectAddException;
+import org.eclipse.ecf.core.SharedObjectContainerJoinException;
+import org.eclipse.ecf.core.SharedObjectDescription;
+import org.eclipse.ecf.core.SharedObjectInitException;
+import org.eclipse.ecf.core.comm.AsynchConnectionEvent;
+import org.eclipse.ecf.core.comm.ConnectionEvent;
+import org.eclipse.ecf.core.comm.DisconnectConnectionEvent;
+import org.eclipse.ecf.core.comm.IConnection;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnectionEventHandler;
+import org.eclipse.ecf.core.comm.SynchConnectionEvent;
+import org.eclipse.ecf.core.events.IContainerEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.provider.generic.gmm.Member;
+
+public abstract class SOContainer implements ISharedObjectContainer {
+
+ public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class
+ .getName()
+ + ".sharedobjectargs";
+ public static final String DEFAULT_OBJECT_ARGTYPES_KEY = SOContainer.class
+ .getName()
+ + ".sharedobjectargs";
+
+ private long sequenceNumber = 0L;
+ private Vector listeners = null;
+
+ protected ISharedObjectContainerConfig config = null;
+ protected SOContainerGMM groupManager = null;
+ protected ThreadGroup loadingThreadGroup = null;
+ protected ThreadGroup sharedObjectThreadGroup = null;
+ protected SOManager sharedObjectManager = null;
+ protected boolean isClosing = false;
+ protected MessageReceiver receiver;
+
+ protected byte[] getBytesForObject(Serializable obj) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ return bos.toByteArray();
+ }
+
+ protected void killConnection(IConnection conn) {
+ try {
+ if (conn != null)
+ conn.disconnect();
+ } catch (IOException e) {
+ // XXX report
+ }
+ }
+ protected void memberLeave(ID target, IConnection conn) {
+ if (conn != null)
+ killConnection(conn);
+ }
+ protected void fireContainerEvent(IContainerEvent event) {
+ synchronized (listeners) {
+ for (Iterator i = listeners.iterator(); i.hasNext();)
+ ((ISharedObjectContainerListener) i.next()).handleEvent(event);
+ }
+ }
+ protected long getNextSequenceNumber() {
+ if (sequenceNumber == Long.MAX_VALUE) {
+ sequenceNumber = 0;
+ return sequenceNumber;
+ } else
+ return sequenceNumber++;
+ }
+ protected Object getGroupMembershipLock() {
+ return groupManager;
+ }
+ public ID[] getOtherMemberIDs() {
+ return groupManager.getOtherMemberIDs();
+ }
+ protected ISharedObject getSharedObject(ID id) {
+ SOWrapper wrap = getSharedObjectWrapper(id);
+ if (wrap == null)
+ return null;
+ else
+ return wrap.getSharedObject();
+ }
+ protected ISharedObject addSharedObjectAndWait(SharedObjectDescription sd,
+ ISharedObject s, ISharedObjectContainerTransaction t)
+ throws Exception {
+ if (sd.getID() == null || s == null)
+ return null;
+ ISharedObject so = addSharedObject0(sd, s);
+ // Wait right here until committed
+ if (t != null)
+ t.waitToCommit();
+ return s;
+ }
+ protected ISharedObject addSharedObject0(SharedObjectDescription sd,
+ ISharedObject s) throws Exception {
+ addSharedObjectWrapper(makeNewSharedObjectWrapper(sd, s));
+ return s;
+ }
+ protected SOWrapper makeNewSharedObjectWrapper(SharedObjectDescription sd,
+ ISharedObject s) {
+ SOConfig newConfig = makeNewSharedObjectConfig(sd, this);
+ return new SOWrapper(newConfig, s, this);
+ }
+ protected SOConfig makeNewSharedObjectConfig(SharedObjectDescription sd,
+ SOContainer cont) {
+ ID homeID = sd.getHomeID();
+ if (homeID == null)
+ homeID = getID();
+ return new SOConfig(sd.getID(), homeID, this, sd.getProperties());
+ }
+ protected SOWrapper getSharedObjectWrapper(ID id) {
+ return groupManager.getFromActive(id);
+ }
+ protected void addSharedObjectWrapper(SOWrapper wrapper) throws Exception {
+ if (wrapper == null)
+ return;
+ ID id = wrapper.getObjID();
+ synchronized (getGroupMembershipLock()) {
+ Object obj = groupManager.getFromAny(id);
+ if (obj != null) {
+ throw new SharedObjectAddException("SharedObject with id "
+ + id.getName() + " already in container");
+ }
+ // Call initialize. If this throws it halts everything
+ wrapper.init();
+ // Put in table
+ groupManager.addSharedObjectToActive(wrapper);
+ }
+ }
+ protected ISharedObject removeSharedObject(ID id) {
+ synchronized (getGroupMembershipLock()) {
+ SOWrapper wrap = groupManager.getFromActive(id);
+ if (wrap == null)
+ return null;
+ groupManager.removeSharedObject(id);
+ return wrap.getSharedObject();
+ }
+ }
+ protected boolean addNewRemoteMember(ID memberID, Object data) {
+ return groupManager.addMember(new Member(memberID, data));
+ }
+ abstract protected void queueContainerMessage(ContainerMessage mess)
+ throws IOException;
+ abstract protected void forwardToRemote(ID from, ID to,
+ ContainerMessage data) throws IOException;
+ abstract protected void forwardExcluding(ID from, ID excluding,
+ ContainerMessage data) throws IOException;
+ protected final void forward(ID fromID, ID toID, ContainerMessage data)
+ throws IOException {
+ if (toID == null) {
+ forwardExcluding(fromID, fromID, data);
+ } else {
+ forwardToRemote(fromID, toID, data);
+ }
+ }
+ protected boolean removeRemoteMember(ID remoteMember) {
+ return groupManager.removeMember(remoteMember);
+ }
+ protected void sendMessage(ContainerMessage data) throws IOException {
+ synchronized (getGroupMembershipLock()) {
+ ID ourID = getID();
+ // We don't send to ourselves
+ if (!ourID.equals(data.getToContainerID()))
+ queueContainerMessage(data);
+ }
+ }
+ protected ID[] sendCreateSharedObjectMessage(ID toContainerID,
+ SharedObjectDescription sd) throws IOException {
+ ID[] returnIDs = null;
+ if (toContainerID == null) {
+ synchronized (getGroupMembershipLock()) {
+ // Send message to all
+ sendMessage(ContainerMessage.makeSharedObjectCreateMessage(
+ getID(), toContainerID, getNextSequenceNumber(), sd));
+ returnIDs = getOtherMemberIDs();
+ }
+ } else {
+ // If the create msg is directed to this space, no msg will be sent
+ if (getID().equals(toContainerID)) {
+ returnIDs = new ID[0];
+ } else {
+ sendMessage(ContainerMessage.makeSharedObjectCreateMessage(
+ getID(), toContainerID, getNextSequenceNumber(), sd));
+ returnIDs = new ID[1];
+ returnIDs[0] = toContainerID;
+ }
+ }
+ return returnIDs;
+ }
+ protected void sendCreateResponseSharedObjectMessage(ID toContainerID,
+ ID fromSharedObject, Throwable t, long ident) throws IOException {
+ sendMessage(ContainerMessage.makeSharedObjectCreateResponseMessage(
+ getID(), toContainerID, getNextSequenceNumber(),
+ fromSharedObject, t, ident));
+ }
+ protected void sendSharedObjectMessage(ID toContainerID,
+ ID fromSharedObject, Serializable data) throws IOException {
+ sendMessage(ContainerMessage.makeSharedObjectMessage(getID(),
+ toContainerID, getNextSequenceNumber(), fromSharedObject, data));
+ }
+ protected void sendDisposeSharedObjectMessage(ID toContainerID,
+ ID fromSharedObject) throws IOException {
+ sendMessage(ContainerMessage.makeSharedObjectDisposeMessage(getID(),
+ toContainerID, getNextSequenceNumber(), fromSharedObject));
+ }
+ public SOContainer(ISharedObjectContainerConfig config) {
+ if (config == null)
+ throw new InstantiationError("config must not be null");
+ this.config = config;
+ groupManager = new SOContainerGMM(this, new Member(config.getID()));
+ sharedObjectManager = new SOManager(this);
+ loadingThreadGroup = getLoadingThreadGroup();
+ sharedObjectThreadGroup = getSharedObjectThreadGroup();
+ listeners = new Vector();
+ receiver = new MessageReceiver();
+ }
+ protected ISynchAsynchConnectionEventHandler getReceiver() {
+ return receiver;
+ }
+ protected boolean isClosing() {
+ return isClosing;
+ }
+ protected void setIsClosing() {
+ isClosing = true;
+ }
+ protected ThreadGroup getLoadingThreadGroup() {
+ return new ThreadGroup(getID() + ":Loading");
+ }
+ protected ThreadGroup getSharedObjectThreadGroup() {
+ return new ThreadGroup(getID() + ":SOs");
+ }
+
+ public ID getID() {
+ return config.getID();
+ }
+
+ protected int getMaxGroupMembers() {
+ return groupManager.getMaxMembers();
+ }
+ protected void setMaxGroupMembers(int max) {
+ groupManager.setMaxMembers(max);
+ }
+
+ protected void notifySharedObjectActivated(ID sharedObjectID) {
+ groupManager.notifyOthersActivated(sharedObjectID);
+ }
+ protected void notifySharedObjectDeactivated(ID sharedObjectID) {
+ groupManager.notifyOthersDeactivated(sharedObjectID);
+ }
+
+ protected boolean destroySharedObject(ID sharedObjectID) {
+ return groupManager.removeSharedObject(sharedObjectID);
+ }
+
+ protected IOSGIService getOSGIServiceInterface() {
+ return null;
+ }
+ protected void sendCreate(ID sharedObjectID, ID toContainerID,
+ SharedObjectDescription sd) throws IOException {
+ sendCreateSharedObjectMessage(toContainerID, sd);
+ }
+ protected void sendDispose(ID toContainerID, ID sharedObjectID)
+ throws IOException {
+ sendDisposeSharedObjectMessage(toContainerID, sharedObjectID);
+ }
+ protected void sendMessage(ID toContainerID, ID sharedObjectID,
+ Object message) throws IOException {
+ if (message == null)
+ return;
+ if (message instanceof Serializable)
+ throw new NotSerializableException(message.getClass().getName());
+ sendSharedObjectMessage(toContainerID, sharedObjectID,
+ (Serializable) message);
+ }
+ protected void sendCreateResponse(ID homeID, ID sharedObjectID,
+ Throwable t, long identifier) throws IOException {
+ sendCreateResponseSharedObjectMessage(homeID, sharedObjectID, t,
+ identifier);
+ }
+ protected Thread getNewSharedObjectThread(ID sharedObjectID,
+ Runnable runnable) {
+ return new Thread(sharedObjectThreadGroup, runnable, getID().getName()
+ + ";" + sharedObjectID.getName());
+ }
+ protected ISharedObject load(SharedObjectDescription sd) throws Exception {
+ return sharedObjectManager.loadSharedObject(sd);
+ }
+ protected ID[] getSharedObjectIDs() {
+ return groupManager.getSharedObjectIDs();
+ }
+ protected SOConfig makeSharedObjectConfig(SharedObjectDescription sd,
+ ISharedObject obj) {
+ return new SOConfig(sd.getID(), sd.getHomeID(), this, sd
+ .getProperties());
+ }
+ protected void moveFromLoadingToActive(SOWrapper wrap) {
+ groupManager.moveSharedObjectFromLoadingToActive(wrap);
+ }
+ protected void removeFromLoading(ID id) {
+ groupManager.removeSharedObjectFromLoading(id);
+ }
+
+ protected void processDisconnect(DisconnectConnectionEvent e) {
+ // XXX TODO
+ }
+ protected void processAsynch(AsynchConnectionEvent e) {
+ // XXX TODO
+ }
+ protected Serializable processSynch(SynchConnectionEvent e)
+ throws IOException {
+ // XXX TODO
+ return null;
+ }
+ class LoadingSharedObject implements ISharedObject {
+
+ SharedObjectDescription description;
+ Thread runner = null;
+
+ LoadingSharedObject(SharedObjectDescription sd) {
+ this.description = sd;
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig)
+ */
+ public void init(ISharedObjectConfig initData)
+ throws SharedObjectInitException {
+ }
+ ID getID() {
+ return description.getID();
+ }
+
+ ID getHomeID() {
+ return description.getHomeID();
+ }
+
+ void start() {
+ if (runner == null) {
+ runner = (Thread) AccessController
+ .doPrivileged(new PrivilegedAction() {
+ public Object run() {
+ return getThread();
+ }
+ });
+ runner.setDaemon(true);
+ runner.start();
+ }
+
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event)
+ */
+ public void handleEvent(Event event) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[])
+ */
+ public void handleEvents(Event[] events) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID)
+ */
+ public void dispose(ID containerID) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+ Thread getThread() {
+ return new Thread(loadingThreadGroup, new Runnable() {
+ public void run() {
+ try {
+ if (Thread.currentThread().isInterrupted()
+ || isClosing())
+ throw new InterruptedException(
+ "Loading interrupted for object "
+ + getID().getName());
+ // First load given object
+ ISharedObject obj = load(description);
+ // Get config info for new object
+ SOConfig aConfig = makeSharedObjectConfig(description,
+ obj);
+ // Call init method on new object.
+ obj.init(aConfig);
+ // Check to make sure thread has not been
+ // interrupted...if it has, throw
+ if (Thread.currentThread().isInterrupted()
+ || isClosing())
+ throw new InterruptedException(
+ "Loading interrupted for object "
+ + getID().getName());
+
+ // Create meta object and move from loading to active
+ // list.
+ SOContainer.this.moveFromLoadingToActive(new SOWrapper(
+ aConfig, obj, SOContainer.this));
+ } catch (Exception e) {
+ SOContainer.this.removeFromLoading(getID());
+ try {
+ sendCreateResponse(getHomeID(), getID(), e,
+ description.getIdentifier());
+ } catch (Exception e1) {
+ }
+ }
+ }
+ }, "LRunner" + getID().getName());
+ }
+
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#getConfig()
+ */
+ public ISharedObjectContainerConfig getConfig() {
+ return config;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#addListener(org.eclipse.ecf.core.ISharedObjectContainerListener,
+ * java.lang.String)
+ */
+ public void addListener(ISharedObjectContainerListener l, String filter) {
+ synchronized (listeners) {
+ listeners.add(l);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#removeListener(org.eclipse.ecf.core.ISharedObjectContainerListener)
+ */
+ public void removeListener(ISharedObjectContainerListener l) {
+ synchronized (listeners) {
+ listeners.remove(l);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#dispose(long)
+ */
+ public void dispose(long waittime) {
+ isClosing = true;
+ // XXX Notify listeners that we're going away
+ // Clear group manager
+ groupManager.removeAllMembers();
+ // Clear shared object manager
+ sharedObjectManager.dispose();
+ try {
+ synchronized (this) {
+ wait(waittime);
+ }
+ } catch (InterruptedException e) {
+
+ }
+ sharedObjectThreadGroup.interrupt();
+ loadingThreadGroup.interrupt();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#joinGroup(org.eclipse.ecf.core.identity.ID,
+ * java.lang.Object)
+ */
+ public abstract void joinGroup(ID groupID, Object loginData)
+ throws SharedObjectContainerJoinException;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#leaveGroup()
+ */
+ public abstract void leaveGroup();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupID()
+ */
+ public abstract ID getGroupID();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupMemberIDs()
+ */
+ public ID[] getGroupMemberIDs() {
+ return groupManager.getMemberIDs();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupManager()
+ */
+ public abstract boolean isGroupManager();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupServer()
+ */
+ public abstract boolean isGroupServer();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#getSharedObjectManager()
+ */
+ public ISharedObjectManager getSharedObjectManager() {
+ return sharedObjectManager;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainer#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class adapter) {
+ return null;
+ }
+
+ protected ClassLoader getClassLoaderForContainer() {
+ return this.getClass().getClassLoader();
+ }
+ /**
+ * @param sd
+ * @return
+ */
+ protected ClassLoader getClassLoaderForSharedObject(
+ SharedObjectDescription sd) {
+ if (sd != null) {
+ ClassLoader cl = sd.getClassLoader();
+ if (cl != null)
+ return cl;
+ else
+ return getClassLoaderForContainer();
+ } else
+ return getClassLoaderForContainer();
+ }
+ /**
+ * @param sd
+ * @return
+ */
+ public Object[] getArgsFromProperties(SharedObjectDescription sd) {
+ if (sd == null)
+ return null;
+ Map aMap = sd.getProperties();
+ if (aMap == null)
+ return null;
+ Object obj = aMap.get(DEFAULT_OBJECT_ARG_KEY);
+ if (obj == null)
+ return null;
+ if (obj instanceof Object[]) {
+ Object[] ret = (Object[]) obj;
+ aMap.remove(DEFAULT_OBJECT_ARG_KEY);
+ return ret;
+ } else
+ return null;
+ }
+ /**
+ * @param sd
+ * @return
+ */
+ public String[] getArgTypesFromProperties(SharedObjectDescription sd) {
+ if (sd == null)
+ return null;
+ Map aMap = sd.getProperties();
+ if (aMap == null)
+ return null;
+ Object obj = aMap.get(DEFAULT_OBJECT_ARGTYPES_KEY);
+ if (obj == null)
+ return null;
+ if (obj instanceof String[]) {
+ String[] ret = (String[]) obj;
+ aMap.remove(DEFAULT_OBJECT_ARGTYPES_KEY);
+ return ret;
+ } else
+ return null;
+ }
+
+ class MessageReceiver implements ISynchAsynchConnectionEventHandler {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.internal.comm.ISynchConnectionEventHandler#handleSynchEvent(org.eclipse.ecf.internal.comm.SynchConnectionEvent)
+ */
+ public Object handleSynchEvent(SynchConnectionEvent event)
+ throws IOException {
+ return processSynch(event);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleSuspectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent)
+ */
+ public boolean handleSuspectEvent(ConnectionEvent event) {
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleDisconnectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent)
+ */
+ public void handleDisconnectEvent(DisconnectConnectionEvent event) {
+ processDisconnect(event);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.internal.comm.IAsynchConnectionEventHandler#handleAsynchEvent(org.eclipse.ecf.internal.comm.AsynchConnectionEvent)
+ */
+ public void handleAsynchEvent(AsynchConnectionEvent event)
+ throws IOException {
+ processAsynch(event);
+ }
+
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java
new file mode 100644
index 000000000..dd493c2b7
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java
@@ -0,0 +1,49 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.ecf.core.ISharedObjectContainerConfig;
+import org.eclipse.ecf.core.identity.ID;
+
+public class SOContainerConfig implements ISharedObjectContainerConfig {
+
+ ID id;
+ Map properties;
+
+ public SOContainerConfig(ID id, Map props) {
+ this.id = id;
+ this.properties = props;
+ }
+ public SOContainerConfig(ID id) {
+ this.id = id;
+ this.properties = new HashMap();
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainerConfig#getProperties()
+ */
+ public Map getProperties() {
+ return properties;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContainerConfig#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.IIdentifiable#getID()
+ */
+ public ID getID() {
+ return id;
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java
new file mode 100644
index 000000000..6020c10fa
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java
@@ -0,0 +1,323 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.TreeMap;
+
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.provider.generic.gmm.GMMImpl;
+import org.eclipse.ecf.provider.generic.gmm.Member;
+import org.eclipse.ecf.provider.generic.gmm.MemberChanged;
+
+class SOContainerGMM implements Observer {
+
+ public static boolean DEBUG = false;
+
+ SOContainer container;
+ Member localMember;
+ GMMImpl groupManager;
+ // Maximum number of members. Default is -1 (no maximum).
+ int maxMembers = -1;
+ TreeMap loading, active;
+
+ SOContainerGMM(SOContainer space, Member local) {
+ container = space;
+ groupManager = new GMMImpl();
+ groupManager.addObserver(this);
+ loading = new TreeMap();
+ active = new TreeMap();
+ localMember = local;
+ addMember(local);
+ }
+
+ ID[] getSharedObjectIDs() {
+ return getActiveKeys();
+ }
+ synchronized boolean addMember(Member m) {
+ if (maxMembers > 0 && getSize() > maxMembers) {
+ return false;
+ } else {
+ return groupManager.addMember(m);
+ }
+ }
+ synchronized int setMaxMembers(int max) {
+ int old = maxMembers;
+ maxMembers = max;
+ return old;
+ }
+ synchronized int getMaxMembers() {
+ return maxMembers;
+ }
+
+ synchronized boolean removeMember(Member m) {
+ boolean res = groupManager.removeMember(m);
+ if (res) {
+ removeSharedObjects(m);
+ }
+ return res;
+ }
+
+ synchronized boolean removeMember(ID id) {
+ Member m = getMemberForID(id);
+ if (m == null)
+ return false;
+ return removeMember(m);
+ }
+
+ void removeAllMembers() {
+ removeAllMembers(null);
+ }
+
+ void removeNonLocalMembers() {
+ removeAllMembers(localMember);
+ }
+
+ synchronized void removeAllMembers(Member exception) {
+ Object m[] = getMembers();
+ for (int i = 0; i < m.length; i++) {
+ Member mem = (Member) m[i];
+ if (exception == null || !exception.equals(mem))
+ removeMember(mem);
+ }
+ }
+
+ synchronized Object[] getMembers() {
+ return groupManager.getMembers();
+ }
+
+ synchronized ID[] getOtherMemberIDs() {
+ return groupManager.getMemberIDs(localMember.getID());
+ }
+
+ synchronized ID[] getMemberIDs() {
+ return groupManager.getMemberIDs(null);
+ }
+
+ synchronized Member getMemberForID(ID id) {
+ Member newMem = new Member(id);
+ for (Iterator i = iterator(); i.hasNext();) {
+ Member oldMem = (Member) i.next();
+ if (newMem.equals(oldMem))
+ return oldMem;
+ }
+ return null;
+ }
+
+ synchronized int getSize() {
+ return groupManager.getSize();
+ }
+
+ synchronized boolean containsMember(Member m) {
+ return groupManager.containsMember(m);
+ }
+
+ synchronized Iterator iterator() {
+ return groupManager.iterator();
+ }
+
+ // End group membership change methods
+
+ synchronized boolean addRepObj(SOWrapper ro) {
+ if (getFromAny(ro.getObjID()) != null)
+ return false;
+ addSharedObjectToActive(ro);
+ return true;
+ }
+
+ synchronized boolean addLoadingSharedObject(
+ SOContainer.LoadingSharedObject lso) {
+ if (getFromAny(lso.getID()) != null)
+ return false;
+ loading.put(lso.getID(), new SOWrapper(lso, container));
+ // And start the thing
+ lso.start();
+ return true;
+ }
+
+ synchronized void moveSharedObjectFromLoadingToActive(SOWrapper ro) {
+ if (removeSharedObjectFromLoading(ro.getObjID()))
+ addSharedObjectToActive(ro);
+ }
+
+ boolean removeSharedObjectFromLoading(ID id) {
+ if (loading.remove(id) != null) {
+ return true;
+ } else
+ return false;
+ }
+
+ synchronized ID[] getActiveKeys() {
+ return (ID[]) active.keySet().toArray(new ID[0]);
+ }
+
+ void addSharedObjectToActive(SOWrapper so) {
+ ID[] ids = getActiveKeys();
+ active.put(so.getObjID(), so);
+ so.activated(ids);
+ }
+
+ synchronized void notifyOthersActivated(ID id) {
+ notifyOtherChanged(id, active, true);
+ }
+
+ synchronized void notifyOthersDeactivated(ID id) {
+ notifyOtherChanged(id, active, false);
+ }
+
+ void notifyOtherChanged(ID id, TreeMap aMap, boolean activated) {
+ for (Iterator i = aMap.values().iterator(); i.hasNext();) {
+ SOWrapper other = (SOWrapper) i.next();
+ if (!id.equals(other.getObjID())) {
+ other.otherChanged(id, activated);
+ }
+ }
+ }
+
+ synchronized boolean removeSharedObject(ID id) {
+ SOWrapper ro = removeFromMap(id, active);
+ if (ro == null)
+ return false;
+ ro.deactivated();
+ return true;
+ }
+
+ synchronized SOWrapper getFromMap(ID objID, TreeMap aMap) {
+ return (SOWrapper) aMap.get(objID);
+ }
+
+ synchronized SOWrapper removeFromMap(ID objID, TreeMap aMap) {
+ return (SOWrapper) aMap.remove(objID);
+ }
+
+ SOWrapper getFromLoading(ID objID) {
+ return getFromMap(objID, loading);
+ }
+
+ SOWrapper getFromActive(ID objID) {
+ return getFromMap(objID, active);
+ }
+
+ synchronized SOWrapper getFromAny(ID objID) {
+ SOWrapper ro = getFromMap(objID, active);
+ if (ro != null)
+ return ro;
+ ro = getFromMap(objID, loading);
+ return ro;
+ }
+
+ // Notification methods
+ void notifyAllOfMemberChange(Member m, TreeMap map, boolean add) {
+ for (Iterator i = map.values().iterator(); i.hasNext();) {
+ SOWrapper ro = (SOWrapper) i.next();
+ ro.memberChanged(m, add);
+ }
+ }
+
+ public void update(Observable o, Object arg) {
+ MemberChanged mc = (MemberChanged) arg;
+ notifyAllOfMemberChange(mc.getMember(), active, mc.getAdded());
+ }
+
+ synchronized void removeSharedObjects(Member m) {
+ removeSharedObjects(m, true);
+ }
+
+ synchronized void clear() {
+ removeSharedObjects(null, true);
+ }
+
+ void removeSharedObjects(Member m, boolean match) {
+ HashSet set = getRemoveIDs(m.getID(), match);
+ Iterator i = set.iterator();
+
+ while (i.hasNext()) {
+ ID removeID = (ID) i.next();
+ if (isLoading(removeID)) {
+ removeSharedObjectFromLoading(removeID);
+ } else {
+ container.destroySharedObject(removeID);
+ }
+ }
+ }
+
+ HashSet getRemoveIDs(ID homeID, boolean match) {
+ HashSet aSet = new HashSet();
+ for (Iterator i = new DestroyIterator(loading, homeID, match); i
+ .hasNext();) {
+ aSet.add(i.next());
+ }
+ for (Iterator i = new DestroyIterator(active, homeID, match); i
+ .hasNext();) {
+ aSet.add(i.next());
+ }
+ return aSet;
+ }
+
+ synchronized boolean isActive(ID id) {
+ return active.containsKey(id);
+ }
+
+ synchronized boolean isLoading(ID id) {
+ return loading.containsKey(id);
+ }
+
+ boolean debug() {
+ return DEBUG;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("RSM[");
+ sb.append(groupManager);
+ sb.append(";L:").append(loading);
+ sb.append(";A:").append(active).append("]");
+ return sb.toString();
+ }
+
+}
+
+class DestroyIterator implements Iterator {
+ ID next;
+ ID homeID;
+ Iterator i;
+ boolean match;
+
+ public DestroyIterator(TreeMap map, ID hID, boolean m) {
+ i = map.values().iterator();
+ homeID = hID;
+ next = null;
+ match = m;
+ }
+
+ public boolean hasNext() {
+ if (next == null)
+ next = getNext();
+ return (next != null);
+ }
+
+ public Object next() {
+ if (hasNext()) {
+ ID value = next;
+ next = null;
+ return value;
+ } else {
+ throw new java.util.NoSuchElementException();
+ }
+ }
+
+ ID getNext() {
+ while (i.hasNext()) {
+ SOWrapper ro = (SOWrapper) i.next();
+ if (homeID == null || (match ^ !ro.getHomeID().equals(homeID))) {
+ return ro.getObjID();
+ }
+ }
+ return null;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java
new file mode 100644
index 000000000..01eb178be
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java
@@ -0,0 +1,45 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.util.TreeMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Iterator;
+
+public class SOContainerGroup {
+
+ String name;
+ protected Map map;
+
+ public SOContainerGroup(String name) {
+ this.name = name;
+ map = Collections.synchronizedMap(new TreeMap());
+ }
+ public String add(String key, SOContainer aSpace) {
+ if (key == null || aSpace == null)
+ return null;
+ map.put(key, aSpace);
+ return key;
+ }
+
+ public SOContainer get(String key) {
+ if (key == null)
+ return null;
+ return (SOContainer) map.get(key);
+ }
+ public SOContainer remove(String key) {
+ if (key == null)
+ return null;
+ return (SOContainer) map.remove(key);
+ }
+ public boolean contains(String key) {
+ if (key == null)
+ return false;
+ return map.containsKey(key);
+ }
+ public String getName() {
+ return name;
+ }
+ public Iterator elements() {
+ return map.values().iterator();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java
new file mode 100644
index 000000000..8501bd105
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java
@@ -0,0 +1,218 @@
+/*
+ * Created on Dec 6, 2004
+ *
+ */
+package org.eclipse.ecf.provider.generic;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.eclipse.ecf.core.IOSGIService;
+import org.eclipse.ecf.core.ISharedObjectContext;
+import org.eclipse.ecf.core.ISharedObjectManager;
+import org.eclipse.ecf.core.SharedObjectContainerJoinException;
+import org.eclipse.ecf.core.SharedObjectDescription;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.QueueEnqueue;
+
+public class SOContext implements ISharedObjectContext {
+
+ SOContainer container = null;
+ ID sharedObjectID;
+ ID homeContainerID;
+ boolean isActive;
+ Map properties;
+ QueueEnqueue queue;
+
+ public SOContext(ID objID, ID homeID, SOContainer cont, Map props,
+ QueueEnqueue queue) {
+ super();
+ this.sharedObjectID = objID;
+ this.homeContainerID = homeID;
+ this.container = cont;
+ this.properties = props;
+ this.queue = queue;
+ }
+ protected synchronized void makeInactive() {
+ container = null;
+ properties = null;
+ queue = null;
+ }
+ protected synchronized boolean isInactive() {
+ return (container == null);
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getContainerID()
+ */
+ public synchronized ID getLocalContainerID() {
+ if (isInactive()) {
+ return null;
+ }
+ return container.getConfig().getID();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getSharedObjectManager()
+ */
+ public synchronized ISharedObjectManager getSharedObjectManager() {
+ if (isInactive()) {
+ return null;
+ }
+ return container.getSharedObjectManager();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getQueue()
+ */
+ public synchronized QueueEnqueue getQueue() {
+ if (isInactive()) {
+ return null;
+ }
+ return queue;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#joinGroup(org.eclipse.ecf.core.identity.ID,
+ * java.lang.Object)
+ */
+ public synchronized void joinGroup(ID groupID, Object loginData)
+ throws SharedObjectContainerJoinException {
+ if (isInactive()) {
+ return;
+ } else
+ container.joinGroup(groupID, loginData);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#leaveGroup()
+ */
+ public synchronized void leaveGroup() {
+ if (isInactive()) {
+ return;
+ } else
+ container.leaveGroup();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getGroupID()
+ */
+ public synchronized ID getGroupID() {
+ if (isInactive()) {
+ return null;
+ } else
+ return container.getGroupID();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#isGroupManager()
+ */
+ public synchronized boolean isGroupManager() {
+ if (isInactive()) {
+ return false;
+ } else
+ return container.isGroupManager();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#isGroupServer()
+ */
+ public synchronized boolean isGroupServer() {
+ if (isInactive()) {
+ return false;
+ } else
+ return container.isGroupManager();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getGroupMembership()
+ */
+ public synchronized ID[] getGroupMemberIDs() {
+ if (isInactive()) {
+ return null;
+ } else
+ return container.getGroupMemberIDs();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#sendCreate(org.eclipse.ecf.core.identity.ID,
+ * org.eclipse.ecf.core.SharedObjectDescription)
+ */
+ public synchronized void sendCreate(ID toContainerID,
+ SharedObjectDescription sd) throws IOException {
+ if (isInactive()) {
+ return;
+ } else {
+ container.sendCreate(sharedObjectID, toContainerID, sd);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#sendDispose(org.eclipse.ecf.core.identity.ID)
+ */
+ public synchronized void sendDispose(ID toContainerID) throws IOException {
+ if (isInactive()) {
+ return;
+ } else {
+ container.sendDispose(toContainerID, sharedObjectID);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#sendMessage(org.eclipse.ecf.core.identity.ID,
+ * java.lang.Object)
+ */
+ public void sendMessage(ID toContainerID, Object data) throws IOException {
+ if (isInactive()) {
+ return;
+ } else {
+ container.sendMessage(toContainerID, sharedObjectID, data);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectContext#getServiceAccess()
+ */
+ public IOSGIService getServiceAccess() {
+ if (isInactive()) {
+ return null;
+ } else {
+ return container.getOSGIServiceInterface();
+ }
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java
new file mode 100644
index 000000000..c8b6b1331
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java
@@ -0,0 +1,264 @@
+/*
+ * Created on Dec 20, 2004
+ *
+ */
+package org.eclipse.ecf.provider.generic;
+
+import java.lang.reflect.Constructor;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.eclipse.ecf.core.ISharedObject;
+import org.eclipse.ecf.core.ISharedObjectConnector;
+import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
+import org.eclipse.ecf.core.ISharedObjectManager;
+import org.eclipse.ecf.core.SharedObjectAddException;
+import org.eclipse.ecf.core.SharedObjectConnectException;
+import org.eclipse.ecf.core.SharedObjectCreateException;
+import org.eclipse.ecf.core.SharedObjectDescription;
+import org.eclipse.ecf.core.SharedObjectDisconnectException;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.AbstractFactory;
+import org.eclipse.ecf.core.util.QueueEnqueue;
+
+/**
+ *
+ */
+public class SOManager implements ISharedObjectManager {
+
+ SOContainer container = null;
+ Vector connectors = null;
+
+ public SOManager(SOContainer cont) {
+ super();
+ this.container = cont;
+ connectors = new Vector();
+ }
+
+ protected void addConnector(ISharedObjectConnector conn) {
+ connectors.add(conn);
+ }
+ protected boolean removeConnector(ISharedObjectConnector conn) {
+ return connectors.remove(conn);
+ }
+ protected List getConnectors() {
+ return connectors;
+ }
+ protected Class[] getArgTypes(String[] argTypes, Object[] args,
+ ClassLoader cl) throws ClassNotFoundException {
+ return AbstractFactory.getClassesForTypes(argTypes, args, cl);
+ }
+
+ protected ISharedObject makeSharedObjectInstance(final Class newClass,
+ final Class[] argTypes, final Object[] args) throws Exception {
+ Object newObject = null;
+ try {
+ newObject = AccessController
+ .doPrivileged(new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ Constructor aConstructor = newClass
+ .getConstructor(argTypes);
+ aConstructor.setAccessible(true);
+ return aConstructor.newInstance(args);
+ }
+ });
+ } catch (java.security.PrivilegedActionException e) {
+ throw e.getException();
+ }
+ return verifySharedObject(newObject);
+ }
+ protected ISharedObject verifySharedObject(Object newSharedObject) {
+ if (newSharedObject instanceof ISharedObject)
+ return (ISharedObject) newSharedObject;
+ else
+ throw new ClassCastException("shared object "
+ + newSharedObject.toString() + " does not implement "
+ + ISharedObject.class.getName());
+ }
+ protected ISharedObject loadSharedObject(SharedObjectDescription sd)
+ throws Exception {
+ // First get classloader
+ ClassLoader cl = container.getClassLoaderForSharedObject(sd);
+ // Then get args array from properties
+ Object[] args = container.getArgsFromProperties(sd);
+ // And arg types
+ String[] types = container.getArgTypesFromProperties(sd);
+ Class[] argTypes = getArgTypes(types, args, cl);
+ // Now load top-level class
+ final Class newClass = Class.forName(sd.getClassname(), true, cl);
+ return makeSharedObjectInstance(newClass, argTypes, args);
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#getSharedObjectIDs()
+ */
+ public ID[] getSharedObjectIDs() {
+ return container.getSharedObjectIDs();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#createSharedObject(org.eclipse.ecf.core.SharedObjectDescription,
+ * org.eclipse.ecf.core.ISharedObjectContainerTransaction)
+ */
+ public ID createSharedObject(SharedObjectDescription sd,
+ ISharedObjectContainerTransaction trans)
+ throws SharedObjectCreateException {
+ ISharedObject newObject = null;
+ Throwable t = null;
+ ID result = sd.getID();
+ try {
+ newObject = loadSharedObject(sd);
+ return addSharedObject(result, newObject, sd.getProperties(), trans);
+ } catch (Exception e) {
+ throw new SharedObjectCreateException(t);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#addSharedObject(org.eclipse.ecf.core.identity.ID,
+ * org.eclipse.ecf.core.ISharedObject, java.util.Map,
+ * org.eclipse.ecf.core.ISharedObjectContainerTransaction)
+ */
+ public ID addSharedObject(ID sharedObjectID, ISharedObject sharedObject,
+ Map properties, ISharedObjectContainerTransaction trans)
+ throws SharedObjectAddException {
+ Throwable t = null;
+ ID result = sharedObjectID;
+ try {
+ ISharedObject so = sharedObject;
+ SharedObjectDescription sd = new SharedObjectDescription(
+ sharedObject.getClass().getClassLoader(), sharedObjectID,
+ container.getID(), sharedObject.getClass().getName(),
+ properties, 0);
+ container.addSharedObjectAndWait(sd, so, trans);
+ } catch (Exception except) {
+ throw new SharedObjectAddException(except);
+ }
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#getSharedObject(org.eclipse.ecf.core.identity.ID)
+ */
+ public ISharedObject getSharedObject(ID sharedObjectID) {
+ return container.getSharedObject(sharedObjectID);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#removeSharedObject(org.eclipse.ecf.core.identity.ID)
+ */
+ public ISharedObject removeSharedObject(ID sharedObjectID) {
+ return container.removeSharedObject(sharedObjectID);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#connectSharedObjects(org.eclipse.ecf.core.identity.ID,
+ * org.eclipse.ecf.core.identity.ID[])
+ */
+ public ISharedObjectConnector connectSharedObjects(ID sharedObjectFrom,
+ ID[] sharedObjectsTo) throws SharedObjectConnectException {
+ if (sharedObjectFrom == null)
+ throw new SharedObjectConnectException("sender cannot be null");
+ if (sharedObjectsTo == null)
+ throw new SharedObjectConnectException("receivers cannot be null");
+ ISharedObjectConnector result = null;
+ synchronized (container.getGroupMembershipLock()) {
+ // Get from to make sure it's there
+ SOWrapper wrap = container.getSharedObjectWrapper(sharedObjectFrom);
+ if (wrap == null)
+ throw new SharedObjectConnectException("sender object "
+ + sharedObjectFrom.getName() + " not found");
+ QueueEnqueue[] queues = new QueueEnqueue[sharedObjectsTo.length];
+ for (int i = 0; i < sharedObjectsTo.length; i++) {
+ SOWrapper w = container
+ .getSharedObjectWrapper(sharedObjectsTo[i]);
+ if (w == null)
+ throw new SharedObjectConnectException("receiver object "
+ + sharedObjectsTo[i].getName() + " not found");
+ queues[i] = new QueueEnqueueImpl(w.getQueue());
+ }
+ // OK now we've got ids and wrappers, make a connector
+ result = new SOConnector(sharedObjectFrom, sharedObjectsTo, queues);
+ }
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#disconnectSharedObjects(org.eclipse.ecf.core.ISharedObjectConnector)
+ */
+ public void disconnectSharedObjects(ISharedObjectConnector connector)
+ throws SharedObjectDisconnectException {
+ if (connector == null)
+ throw new SharedObjectDisconnectException("connect cannot be null");
+ if (!removeConnector(connector)) {
+ throw new SharedObjectDisconnectException("connector " + connector
+ + " not found");
+ }
+ connector.dispose();
+ }
+ protected void dispose() {
+ for (Enumeration e = connectors.elements(); e.hasMoreElements();) {
+ ISharedObjectConnector conn = (ISharedObjectConnector) e
+ .nextElement();
+ conn.dispose();
+ }
+ connectors.clear();
+ }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObjectManager#getSharedObjectConnectors(org.eclipse.ecf.core.identity.ID)
+ */
+ public List getSharedObjectConnectors(ID sharedObjectFrom) {
+ List results = new ArrayList();
+ for (Enumeration e = connectors.elements(); e.hasMoreElements();) {
+ ISharedObjectConnector conn = (ISharedObjectConnector) e
+ .nextElement();
+ if (sharedObjectFrom.equals(conn.getSender())) {
+ results.add(conn);
+ }
+ }
+ return results;
+ }
+
+ public static Class[] getClassesForTypes(String[] argTypes, Object[] args,
+ ClassLoader cl) throws ClassNotFoundException {
+ Class clazzes[] = null;
+ if (args == null || args.length == 0)
+ clazzes = new Class[0];
+ else if (argTypes != null) {
+ clazzes = new Class[argTypes.length];
+ for (int i = 0; i < argTypes.length; i++) {
+ clazzes[i] = Class.forName(argTypes[i], true, cl);
+ }
+ } else {
+ clazzes = new Class[args.length];
+ for (int i = 0; i < args.length; i++) {
+ if (args[i] == null)
+ clazzes[i] = null;
+ else
+ clazzes[i] = args[i].getClass();
+ }
+ }
+ return clazzes;
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java
new file mode 100644
index 000000000..93b4a17ce
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java
@@ -0,0 +1,294 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.io.Serializable;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import org.eclipse.ecf.core.ISharedObject;
+import org.eclipse.ecf.core.SharedObjectInitException;
+import org.eclipse.ecf.core.events.SharedObjectActivatedEvent;
+import org.eclipse.ecf.core.events.SharedObjectContainerDepartedEvent;
+import org.eclipse.ecf.core.events.SharedObjectContainerJoinedEvent;
+import org.eclipse.ecf.core.events.SharedObjectDeactivatedEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.AsynchResult;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.core.util.SimpleQueueImpl;
+import org.eclipse.ecf.provider.Debug;
+import org.eclipse.ecf.provider.generic.gmm.Member;
+
+final class SOWrapper {
+ static Debug debug = Debug.create(SOWrapper.class.getName());
+
+ protected ISharedObject sharedObject;
+ private SOConfig sharedObjectConfig;
+ private ID sharedObjectID;
+ private ID sharedObjectHomeID;
+ private SOContainer container;
+ private ID containerID;
+ private Thread thread;
+ private SimpleQueueImpl queue;
+
+ SOWrapper(SOContainer.LoadingSharedObject obj, SOContainer cont) {
+ sharedObjectID = obj.getID();
+ sharedObjectHomeID = obj.getHomeID();
+ sharedObject = obj;
+ container = cont;
+ containerID = cont.getID();
+ sharedObjectConfig = null;
+ thread = null;
+ queue = new SimpleQueueImpl();
+ }
+ SOWrapper(SOConfig aConfig, ISharedObject obj, SOContainer cont) {
+ sharedObjectConfig = aConfig;
+ sharedObjectID = sharedObjectConfig.getSharedObjectID();
+ sharedObjectHomeID = sharedObjectConfig.getHomeContainerID();
+ sharedObject = obj;
+ container = cont;
+ containerID = cont.getID();
+ thread = null;
+ queue = new SimpleQueueImpl();
+ }
+
+ void init() throws SharedObjectInitException {
+ sharedObject.init(sharedObjectConfig);
+ }
+ ID getObjID() {
+ return sharedObjectConfig.getSharedObjectID();
+ }
+
+ ID getHomeID() {
+ return sharedObjectConfig.getHomeContainerID();
+ }
+
+ void activated(ID[] ids) {
+ // First, make space reference accessible to use by RepObject
+ sharedObjectConfig.makeActive(new QueueEnqueueImpl(queue));
+ thread = (Thread) AccessController.doPrivileged(new PrivilegedAction() {
+ public Object run() {
+ // Get thread instance
+ Thread aThread = getThread();
+ return aThread;
+ }
+ });
+ thread.start();
+ send(new SharedObjectActivatedEvent(containerID, sharedObjectID, ids));
+ container.notifySharedObjectActivated(sharedObjectID);
+
+ }
+ void deactivated() {
+ send(new SharedObjectDeactivatedEvent(containerID, sharedObjectID));
+ container.notifySharedObjectDeactivated(sharedObjectID);
+ destroyed();
+ }
+ private void destroyed() {
+ if (!queue.isStopped()) {
+ sharedObjectConfig.makeInactive();
+ // Enqueue destroy message on our RepObject's queue
+ if (thread != null)
+ queue.enqueue(new DisposeEvent());
+ // Close queue...RepObject will receive no more messages from this
+ // point on.
+ queue.close();
+ }
+
+ }
+ void otherChanged(ID otherID, boolean activated) {
+ if (activated && thread != null) {
+ send(new SharedObjectActivatedEvent(containerID, otherID, null));
+ } else {
+ send(new SharedObjectDeactivatedEvent(containerID, otherID));
+ }
+ }
+ void memberChanged(Member m, boolean add) {
+ if (thread != null) {
+ if (add) {
+ send(new SharedObjectContainerJoinedEvent(containerID, m
+ .getID()));
+ } else {
+ send(new SharedObjectContainerDepartedEvent(containerID, m
+ .getID()));
+ }
+ }
+ }
+ Thread getThread() {
+ // Get new thread instance from space.
+ return container.getNewSharedObjectThread(sharedObjectID,
+ new Runnable() {
+ public void run() {
+ if (Debug.ON && debug != null) {
+ debug.msg("Starting runner for " + sharedObjectID);
+ }
+ // The debug class will associate this thread with
+ // container
+ Debug.setThreadDebugGroup(container.getID());
+ // Then process messages on queue until interrupted or
+ // queue closed
+ //Msg aMsg = null;
+ Event evt = null;
+ for (;;) {
+ // make sure the thread hasn't been interrupted and
+ // get Msg from SimpleQueueImpl
+ if (Thread.currentThread().isInterrupted())
+ break;
+
+ evt = (Event) queue.dequeue();
+ if (Thread.currentThread().isInterrupted()
+ || evt == null)
+ break;
+
+ try {
+ if (evt instanceof ProcEvent) {
+ SOWrapper.this.svc(((ProcEvent) evt)
+ .getEvent());
+ } else if (evt instanceof DisposeEvent) {
+ SOWrapper.this.doDestroy();
+ }
+ } catch (Throwable t) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(t,
+ "Exception executing event " + evt
+ + " on meta " + this);
+ }
+ handleRuntimeException(t);
+ }
+ }
+ // If the thread was interrupted, then show appropriate
+ // spam
+ if (Thread.currentThread().isInterrupted()) {
+ if (Debug.ON && debug != null) {
+ debug
+ .msg("Runner for "
+ + sharedObjectID
+ + " terminating after being interrupted");
+ }
+ } else {
+ if (Debug.ON && debug != null) {
+ debug.msg("Runner for " + sharedObjectID
+ + " terminating normally");
+ }
+ }
+ }
+ });
+ }
+ private void send(Event evt) {
+ queue.enqueue(new ProcEvent(evt));
+ }
+
+ protected static class ProcEvent implements Event {
+ Event theEvent = null;
+ ProcEvent(Event event) {
+ theEvent = event;
+ }
+ Event getEvent() {
+ return theEvent;
+ }
+ }
+ protected static class DisposeEvent implements Event {
+ DisposeEvent() {
+ }
+ }
+ void svc(Event evt) {
+ sharedObject.handleEvent(evt);
+ }
+ void doDestroy() {
+ sharedObject.dispose(containerID);
+ }
+
+ //void createMsgResp(ID fromID, ContainerMessage.CreateResponse resp) {
+ /*
+ * if (sharedObjectConfig.getMsgMask().get(MsgMask.CREATERESPONSE) && thread !=
+ * null) { send( Msg.makeMsg( null, CREATE_RESP_RCVD, fromID, resp.myExcept,
+ * new Long(resp.mySeq))); }
+ */
+ //}
+ void deliverObjectFromRemote(ID fromID, Serializable data) {
+ // If we have a container, forward message onto container
+ /*
+ * if (myContainerID != null) { forwardToContainer( Msg.makeMsg(null,
+ * REMOTE_REPOBJ_MSG, fromID, data)); // otherwise, send to our object
+ * (assuming it has thread and that it wants to receive message) } else
+ * if ( sharedObjectConfig.getMsgMask().get(MsgMask.REMOTEDATA) &&
+ * thread != null) { send(Msg.makeMsg(null, REMOTE_REPOBJ_MSG, fromID,
+ * data)); }
+ */
+ }
+
+ void forwardToContainer(Event msg) {
+ /*
+ * try { container.deliverForwardToRepObject(sharedObjectID,
+ * myContainerID, msg); } catch (Exception e) {
+ * handleRuntimeException(e); }
+ */
+ }
+ void deliverEventFromSharedObject(ID fromID, Event evt) {
+ /*
+ * if (myContainerID != null) { forwardToContainer(Msg.makeMsg(null,
+ * REPOBJ_MSG, fromID, msg)); // otherwise, send to our object (assuming
+ * it has thread and that it wants to receive message) } else if (
+ * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread !=
+ * null) { send(Msg.makeMsg(null, REPOBJ_MSG, fromID, msg)); }
+ */
+ }
+ void deliverRequestFromRepObject(ID fromID, Event evt, AsynchResult future) {
+ /*
+ * if (myContainerID != null) { forwardToContainer( Msg.makeMsg(null,
+ * REPOBJ_REQ, fromID, msg, future)); } else if (
+ * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread !=
+ * null) { // Check to see that messages may be received...determined by
+ * the REPOBJMSG // bit in msg mask send(Msg.makeMsg(null, REPOBJ_REQ,
+ * fromID, msg, future)); }
+ */
+ }
+ void deliverForwardedMsg(ID fromID, Event evt) {
+ /*
+ * if (myContainerID != null) { forwardToContainer(Msg.makeMsg(null,
+ * REPOBJ_FOR, fromID, msg)); } else if (
+ * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread !=
+ * null) { send(Msg.makeMsg(null, REPOBJ_FOR, fromID, msg)); }
+ */
+ }
+ void deliverRemoteMessageFailed(ID toID, Serializable object, Throwable e) {
+ /*
+ * if (sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread !=
+ * null) { send(Msg.makeMsg(null, REMOTE_REPOBJ_MSG_FAILED, toID,
+ * object, e)); }
+ */
+ }
+
+ void destroySelf() {
+ /*
+ * if (thread != null) { send(Msg.makeMsg(null, REPOBJ_DESTROY_SELF)); }
+ */
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("SharedObjectWrapper[").append(getObjID()).append("]");
+ return sb.toString();
+ }
+ void handleRuntimeException(Throwable except) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(except, "handleRuntimeException called for "
+ + sharedObjectID);
+ }
+ try {
+ Debug.errDumpStack(except, "handleRuntimeException called for "
+ + sharedObjectID);
+ } catch (Throwable e) {
+ }
+ }
+ /**
+ * @return
+ */
+ protected ISharedObject getSharedObject() {
+ return sharedObject;
+ }
+ /**
+ * @return
+ */
+ public SimpleQueueImpl getQueue() {
+ return queue;
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java
new file mode 100644
index 000000000..0c3e9c5d0
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java
@@ -0,0 +1,230 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.Serializable;
+import java.net.ConnectException;
+import java.net.Socket;
+
+import org.eclipse.ecf.core.ISharedObjectContainerConfig;
+import org.eclipse.ecf.core.SharedObjectContainerJoinException;
+import org.eclipse.ecf.core.comm.IAsynchConnection;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.provider.generic.gmm.Member;
+
+public class ServerSOContainer extends SOContainer {
+
+ public ServerSOContainer(ISharedObjectContainerConfig config) {
+ super(config);
+ }
+ public boolean isGroupServer() {
+ return true;
+ }
+ public boolean isGroupManager() {
+ return true;
+ }
+ public ID getGroupID() {
+ return getID();
+ }
+ protected void queueContainerMessage(ContainerMessage message)
+ throws IOException {
+ if (message.getToContainerID() == null) {
+ queueToAll(message);
+ } else {
+ IAsynchConnection conn = getConnectionForID(message
+ .getToContainerID());
+ if (conn != null)
+ conn.sendAsynch(message.getToContainerID(),
+ getBytesForObject(message));
+ }
+ }
+ protected void forwardToRemote(ID from, ID to, ContainerMessage data)
+ throws IOException {
+ queueContainerMessage(new ContainerMessage(from, to,
+ getNextSequenceNumber(), data.getData()));
+ }
+ protected void forwardExcluding(ID from, ID excluding, ContainerMessage data)
+ throws IOException {
+ if (excluding == null) {
+ queueContainerMessage(new ContainerMessage(from, null,
+ getNextSequenceNumber(), data.getData()));
+ } else {
+ Object ms[] = groupManager.getMembers();
+ for (int i = 0; i < ms.length; i++) {
+ Member m = (Member) ms[i];
+ ID oldID = m.getID();
+ if (!excluding.equals(oldID) && !from.equals(oldID)) {
+ IAsynchConnection conn = (IAsynchConnection) m.getData();
+ if (conn != null) {
+ try {
+ conn.sendAsynch(oldID,
+ getBytesForObject(new ContainerMessage(
+ from, oldID,
+ getNextSequenceNumber(), data
+ .getData())));
+ } catch (IOException e) {
+ // XXX log this
+ }
+ }
+ }
+ }
+ }
+ }
+ protected void handleChangeMsg(ID fromID, ID toID, long seqNum,
+ Serializable data) throws IOException {
+ // Server should never receive change messages
+ }
+
+ public void joinGroup(ID group, Object data)
+ throws SharedObjectContainerJoinException {
+ SharedObjectContainerJoinException e = new SharedObjectContainerJoinException(
+ "Server cannot join group " + group.getName());
+ throw e;
+ }
+ public void leaveGroup() {
+ ejectAllGroupMembers();
+ }
+
+ protected ContainerMessage acceptNewClient(Socket socket, String target,
+ Serializable data, ISynchAsynchConnection conn) {
+ try {
+ ContainerMessage mess = (ContainerMessage) data;
+ if (mess == null)
+ throw new InvalidObjectException("container message is null");
+ ID remoteID = mess.getFromContainerID();
+ if (remoteID == null)
+ throw new InvalidObjectException("remote id is null");
+
+ ContainerMessage.JoinGroupMessage jgm = (ContainerMessage.JoinGroupMessage) mess
+ .getData();
+ if (jgm == null)
+ throw new IOException("join group message is null");
+ ID memberIDs[] = null;
+
+ synchronized (getGroupMembershipLock()) {
+ if (isClosing) {
+ Exception e = new InvalidObjectException(
+ "container is closing");
+ throw e;
+ }
+
+ if (addNewRemoteMember(remoteID, conn)) {
+ // Notify existing remotes about new member
+ try {
+ forwardExcluding(getID(), remoteID, ContainerMessage
+ .makeViewChangeMessage(getID(), remoteID,
+ getNextSequenceNumber(),
+ new ID[] { remoteID }, true, null));
+ } catch (IOException e) {
+ }
+ // Get current membership
+ memberIDs = groupManager.getMemberIDs();
+ // Start messaging to new member
+ conn.start();
+ } else {
+ ConnectException e = new ConnectException(
+ "server refused connection");
+ throw e;
+ }
+ }
+ return ContainerMessage.makeViewChangeMessage(getID(), remoteID,
+ getNextSequenceNumber(), memberIDs, true, null);
+ } catch (Exception e) {
+ // XXX Log this
+
+ // And then return null...which means refusal
+ return null;
+ }
+ }
+
+ protected Serializable getConnectDataFromInput(Serializable input)
+ throws Exception {
+ return input;
+ }
+
+ protected Object checkJoin(String hostname, ID id, Serializable data)
+ throws Exception {
+ return null;
+ }
+
+ protected void memberLeave(ID leaveID, IAsynchConnection conn) {
+ if (removeRemoteMember(leaveID)) {
+ try {
+ forwardExcluding(getID(), leaveID, ContainerMessage
+ .makeViewChangeMessage(getID(), leaveID,
+ getNextSequenceNumber(), new ID[] { leaveID },
+ false, null));
+ } catch (IOException e) {
+ }
+ }
+ killConnection(conn);
+ }
+
+ public void ejectGroupMember(ID memberID) {
+ IAsynchConnection conn = null;
+ synchronized (getGroupMembershipLock()) {
+ conn = getConnectionForID(memberID);
+ if (conn == null)
+ return;
+ try {
+ conn.sendAsynch(
+
+ memberID, getBytesForObject(ContainerMessage
+ .makeLeaveGroupMessage(getID(), memberID,
+ getNextSequenceNumber(), null)));
+ } catch (Exception e) {
+ }
+ memberLeave(memberID, conn);
+ }
+ }
+
+ public void ejectAllGroupMembers() {
+ synchronized (getGroupMembershipLock()) {
+ Object[] members = groupManager.getMembers();
+ for (int i = 0; i < members.length; i++) {
+ ejectGroupMember(((Member) members[i]).getID());
+ }
+ }
+ }
+
+ // Support methods
+ protected ID getIDForConnection(IAsynchConnection conn) {
+ Object ms[] = groupManager.getMembers();
+ for (int i = 0; i < ms.length; i++) {
+ Member m = (Member) ms[i];
+ if (conn == (IAsynchConnection) m.getData())
+ return m.getID();
+ }
+ return null;
+ }
+ protected IAsynchConnection getConnectionForID(ID memberID) {
+ Member mem = groupManager.getMemberForID(memberID);
+ if (mem == null)
+ return null;
+ return (IAsynchConnection) mem.getData();
+ }
+
+ private final void queueToAll(ContainerMessage message) {
+ Object[] members = groupManager.getMembers();
+ for (int i = 0; i < members.length; i++) {
+ IAsynchConnection conn = (IAsynchConnection) ((Member) members[i])
+ .getData();
+ if (conn != null) {
+ try {
+ conn.sendAsynch(message.getToContainerID(),
+ getBytesForObject(message));
+ } catch (IOException e) {
+ // XXX report
+ }
+ }
+ }
+ }
+
+ public void dispose(long timeout) {
+ // For servers, we'll eject all members
+ ejectAllGroupMembers();
+ super.dispose(timeout);
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java
new file mode 100644
index 000000000..597cdb0ae
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java
@@ -0,0 +1,43 @@
+package org.eclipse.ecf.provider.generic;
+
+import org.eclipse.ecf.core.ISharedObjectContainerConfig;
+import org.eclipse.ecf.core.comm.ConnectionFactory;
+import org.eclipse.ecf.core.comm.ConnectionInstantiationException;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.identity.IDFactory;
+
+public class TCPClientSOContainer extends ClientSOContainer {
+ int keepAlive = 0;
+
+ public static final String DEFAULT_COMM_NAME = "tcpclient";
+
+ public TCPClientSOContainer(ISharedObjectContainerConfig config) {
+ super(config);
+ }
+
+ public TCPClientSOContainer(ISharedObjectContainerConfig config, int ka) {
+ super(config);
+ keepAlive = ka;
+ }
+
+ protected ISynchAsynchConnection getClientConnection(ID remoteSpace,
+ Object data) throws ConnectionInstantiationException {
+
+ Object[] args = { new Integer(keepAlive) };
+ ISynchAsynchConnection conn = null;
+ conn = ConnectionFactory.makeSynchAsynchConnection(receiver, DEFAULT_COMM_NAME, args);
+ return conn;
+ }
+
+ public static final void main(String[] args) throws Exception {
+ ISharedObjectContainerConfig config = new SOContainerConfig(IDFactory
+ .makeGUID());
+ TCPClientSOContainer container = new TCPClientSOContainer(config);
+ // now join group
+ ID serverID = IDFactory.makeStringID(TCPServerSOContainer
+ .getDefaultServerURL());
+ container.joinGroup(serverID, null);
+ Thread.sleep(200000);
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java
new file mode 100644
index 000000000..586a9d1b9
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java
@@ -0,0 +1,97 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.eclipse.ecf.core.ISharedObjectContainerConfig;
+import org.eclipse.ecf.core.comm.ConnectionRequestHandler;
+import org.eclipse.ecf.core.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.identity.IDFactory;
+
+public class TCPServerSOContainer extends ServerSOContainer implements
+ ConnectionRequestHandler {
+
+ public static final String DEFAULT_PROTOCOL = "ecftcp";
+ public static final int DEFAULT_PORT = 3282;
+ public static final int DEFAULT_KEEPALIVE = 30000;
+ public static final String DEFAULT_NAME = "server";
+ // Keep alive value
+ protected int keepAlive;
+ protected TCPServerSOContainerGroup group;
+
+ protected int getKeepAlive() {
+ return keepAlive;
+ }
+ public static String getServerURL(String host, String name) {
+ return DEFAULT_PROTOCOL + "://" + host + ":" + DEFAULT_PORT + "/"
+ + name;
+ }
+ public static String getDefaultServerURL() {
+ return getServerURL("localhost", DEFAULT_NAME);
+ }
+ public TCPServerSOContainer(ISharedObjectContainerConfig config,
+ TCPServerSOContainerGroup grp, int keepAlive) throws IOException,
+ URISyntaxException {
+ super(config);
+ this.keepAlive = keepAlive;
+ // Make sure URI syntax is followed.
+ URI aURI = new URI(config.getID().getName());
+ int urlPort = aURI.getPort();
+ if (group == null) {
+ this.group = new TCPServerSOContainerGroup(urlPort);
+ this.group.putOnTheAir();
+ } else
+ this.group = grp;
+ String path = aURI.getPath();
+ group.add(path, this);
+ }
+
+ public TCPServerSOContainer(ISharedObjectContainerConfig config,
+ TCPServerSOContainerGroup listener, String path, int keepAlive) {
+ super(config);
+ initialize(listener, path, keepAlive);
+ }
+ protected void initialize(TCPServerSOContainerGroup listener, String path,
+ int keepAlive) {
+ this.keepAlive = keepAlive;
+ this.group = listener;
+ this.group.add(path, this);
+ }
+ public void dispose(long timeout) {
+ URI aURI = null;
+ try {
+ aURI = new URI(getID().getName());
+ } catch (Exception e) {
+ // Should never happen
+ }
+ if (aURI != null)
+ group.remove(aURI.getPath());
+ group = null;
+ super.dispose(timeout);
+ }
+ public TCPServerSOContainer(ISharedObjectContainerConfig config)
+ throws IOException, URISyntaxException {
+ this(config, null, DEFAULT_PORT);
+ }
+ public Serializable checkConnect(Socket socket, String target,
+ Serializable data, ISynchAsynchConnection conn) {
+ return acceptNewClient(socket, target, data, conn);
+ }
+
+ protected Serializable getConnectDataFromInput(Serializable input)
+ throws Exception {
+ return input;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ID server = IDFactory.makeStringID(getDefaultServerURL());
+ TCPServerSOContainer cont = new TCPServerSOContainer(
+ new SOContainerConfig(server));
+
+ Thread.sleep(3000000);
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java
new file mode 100644
index 000000000..865ff914f
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java
@@ -0,0 +1,140 @@
+package org.eclipse.ecf.provider.generic;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.Socket;
+import java.net.URI;
+
+import org.eclipse.ecf.provider.Debug;
+import org.eclipse.ecf.core.comm.ConnectionRequestHandler;
+import org.eclipse.ecf.provider.comm.tcp.Client;
+import org.eclipse.ecf.provider.comm.tcp.ConnectRequestMessage;
+import org.eclipse.ecf.provider.comm.tcp.ConnectResultMessage;
+import org.eclipse.ecf.provider.comm.tcp.ExObjectInputStream;
+import org.eclipse.ecf.provider.comm.tcp.ExObjectOutputStream;
+import org.eclipse.ecf.provider.comm.tcp.ISocketAcceptHandler;
+import org.eclipse.ecf.provider.comm.tcp.Server;
+
+public class TCPServerSOContainerGroup extends SOContainerGroup implements
+ ISocketAcceptHandler {
+
+ public static final String INVALID_CONNECT = "Invalid connect request. ";
+
+ public static final Debug debug = Debug
+ .create(TCPServerSOContainerGroup.class.getName());
+ public static final String DEFAULT_GROUP_NAME = TCPServerSOContainerGroup.class
+ .getName();
+
+ protected int port;
+ Server listener;
+ boolean isOnTheAir = false;
+ ThreadGroup threadGroup;
+
+ public TCPServerSOContainerGroup(String name, ThreadGroup group, int port) {
+ super(name);
+ threadGroup = group;
+ this.port = port;
+ }
+
+ public TCPServerSOContainerGroup(String name, int port) {
+ this(name, null, port);
+ }
+
+ public TCPServerSOContainerGroup(int port) {
+ this(DEFAULT_GROUP_NAME, null, port);
+ }
+
+ public synchronized void putOnTheAir() throws IOException {
+ if (Debug.ON && debug != null) {
+ debug.msg("Putting group " + this + " on the air.");
+ }
+ listener = new Server(threadGroup, port, this);
+ port = listener.getLocalPort();
+ isOnTheAir = true;
+ }
+
+ public synchronized boolean isOnTheAir() {
+ return isOnTheAir;
+ }
+
+ public void handleAccept(Socket aSocket) throws Exception {
+ ObjectOutputStream oStream = new ExObjectOutputStream(
+ new BufferedOutputStream(aSocket.getOutputStream()));
+ oStream.flush();
+
+ ObjectInputStream iStream = new ExObjectInputStream(aSocket
+ .getInputStream());
+
+ ConnectRequestMessage req = (ConnectRequestMessage) iStream
+ .readObject();
+ if (Debug.ON && debug != null) {
+ debug.msg("Got connect request " + req);
+ }
+ if (req == null)
+ throw new InvalidObjectException(INVALID_CONNECT
+ + "ConnectRequestMessage is null");
+
+ URI uri = req.getTarget();
+ if (uri == null)
+ throw new InvalidObjectException(INVALID_CONNECT
+ + "Target URI is null");
+ String path = uri.getPath();
+ if (path == null)
+ throw new InvalidObjectException(INVALID_CONNECT
+ + "Target path is null");
+
+ TCPServerSOContainer srs = (TCPServerSOContainer) get(path);
+ if (Debug.ON && debug != null) {
+ debug.msg("Found container with " + srs.getID().getName()
+ + " for target " + uri);
+ }
+ if (srs == null)
+ throw new InvalidObjectException("Container for target " + path
+ + " not found!");
+
+ // Create our local messaging interface
+ Client newClient = new Client(aSocket, iStream, oStream, srs
+ .getReceiver(), srs.keepAlive);
+
+ // No other threads can access messaging interface until space has
+ // accepted/rejected
+ // connect request
+ synchronized (newClient) {
+ // Call checkConnect
+ Serializable resp = (Serializable) ((ConnectionRequestHandler) srs)
+ .checkConnect(aSocket, path, req.getData(), newClient);
+ // Create connect response wrapper and send it back
+ oStream.writeObject(new ConnectResultMessage(resp));
+ oStream.flush();
+ }
+ }
+
+ public synchronized void takeOffTheAir() {
+ if (listener != null) {
+ if (Debug.ON && debug != null) {
+ debug.msg("Taking " + getName() + " on the air.");
+ }
+ try {
+ listener.close();
+ } catch (IOException e) {
+ if (Debug.ON && debug != null) {
+ debug.dumpStack(e, "Exception in closeListener");
+ }
+ }
+ listener = null;
+ }
+ isOnTheAir = false;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String toString() {
+ return super.toString() + ";port:" + port;
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java
new file mode 100644
index 000000000..816828aa4
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java
@@ -0,0 +1,42 @@
+package org.eclipse.ecf.provider.generic.events;
+
+import org.eclipse.ecf.core.events.ISharedObjectCallEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.AsynchResult;
+import org.eclipse.ecf.core.util.Event;
+
+public class SharedObjectCallEvent implements ISharedObjectCallEvent {
+
+ ID sender;
+ Event event;
+ AsynchResult result;
+
+ public SharedObjectCallEvent(ID sender, Event evt, AsynchResult res) {
+ super();
+ this.sender = sender;
+ this.event = evt;
+ this.result = res;
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.ecf.core.events.ISharedObjectCallEvent#getAsynchResult()
+ */
+ public AsynchResult getAsynchResult() {
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.ecf.core.events.ISharedObjectEvent#getSenderSharedObjectID()
+ */
+ public ID getSenderSharedObjectID() {
+ return sender;
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.ecf.core.events.ISharedObjectEvent#getEvent()
+ */
+ public Event getEvent() {
+ return event;
+ }
+
+}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java
new file mode 100644
index 000000000..48fd07a23
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java
@@ -0,0 +1,80 @@
+package org.eclipse.ecf.provider.generic.gmm;
+
+import java.util.Observable;
+import java.util.TreeSet;
+import java.util.Iterator;
+
+import org.eclipse.ecf.core.identity.ID;
+
+public class GMMImpl extends Observable {
+
+ TreeSet mySet;
+
+ public GMMImpl() {
+ mySet = new TreeSet();
+ }
+
+ public boolean addMember(Member m) {
+ boolean res = mySet.add(m);
+ if (res) {
+ setChanged();
+ notifyObservers(new MemberChanged(m, true));
+ }
+ return res;
+ }
+
+ public boolean removeMember(Member m) {
+ boolean res = mySet.remove(m);
+ if (res) {
+ setChanged();
+ notifyObservers(new MemberChanged(m, false));
+ }
+ return res;
+ }
+
+ public void removeAllMembers() {
+ Object members[] = getMembers();
+ for (int i = 0; i < members.length; i++) {
+ removeMember((Member) members[i]);
+ }
+ }
+
+ public Object[] getMembers() {
+ return mySet.toArray();
+ }
+
+ public ID[] getMemberIDs(ID exclude) {
+ TreeSet newSet = null;
+ if (exclude != null) {
+ newSet = (TreeSet) mySet.clone();
+ newSet.remove(new Member(exclude));
+ } else {
+ newSet = mySet;
+ }
+ ID ids[] = new ID[newSet.size()];
+ Iterator iter = newSet.iterator();
+ int j = 0;
+ while (iter.hasNext()) {
+ ids[j++] = (ID) ((Member) iter.next()).getID();
+ }
+ return ids;
+ }
+
+ public int getSize() {
+ return mySet.size();
+ }
+
+ public boolean containsMember(Member m) {
+ return mySet.contains(m);
+ }
+
+ public Iterator iterator() {
+ return mySet.iterator();
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("GMM").append(mySet);
+ return sb.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java
new file mode 100644
index 000000000..35dcd7656
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java
@@ -0,0 +1,53 @@
+package org.eclipse.ecf.provider.generic.gmm;
+
+import org.eclipse.ecf.core.identity.ID;
+
+public class Member implements Comparable {
+
+ ID member;
+ Object data;
+
+ public Member(ID member) {
+ this(member, null);
+ }
+
+ public Member(ID member, Object data) {
+ this.member = member;
+ this.data = data;
+ }
+
+ public boolean equals(Object o) {
+ if (o != null && o instanceof Member) {
+ return member.equals(((Member) o).member);
+ } else
+ return false;
+ }
+
+ public int hashCode() {
+ return member.hashCode();
+ }
+
+ public int compareTo(Object o) {
+ if (o != null && o instanceof Member) {
+ return member.compareTo(((Member) o).member);
+ } else
+ return 0;
+ }
+
+ public ID getID() {
+ return member;
+ }
+
+ public Object getData() {
+ return data;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Member[").append(member).append(";").append(
+ data).append(
+ "]");
+ return sb.toString();
+ }
+
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java
new file mode 100644
index 000000000..3db3ed410
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java
@@ -0,0 +1,19 @@
+package org.eclipse.ecf.provider.generic.gmm;
+
+public class MemberChanged {
+ Member member;
+ boolean added;
+
+ public MemberChanged(Member member, boolean added) {
+ this.member = member;
+ this.added = added;
+ }
+
+ public Member getMember() {
+ return member;
+ }
+
+ public boolean getAdded() {
+ return added;
+ }
+} \ No newline at end of file

Back to the top