diff options
| author | slewis | 2004-12-29 06:44:54 +0000 |
|---|---|---|
| committer | slewis | 2004-12-29 06:44:54 +0000 |
| commit | feed39c1ad5ed5d9b34b8020e12a27a23bab7c72 (patch) | |
| tree | c11350dac0a19133196a1ef430db6150d445608a | |
| parent | 78e0981466e30e04369bb909a1cf8aefa14a3ce9 (diff) | |
| download | org.eclipse.ecf-feed39c1ad5ed5d9b34b8020e12a27a23bab7c72.tar.gz org.eclipse.ecf-feed39c1ad5ed5d9b34b8020e12a27a23bab7c72.tar.xz org.eclipse.ecf-feed39c1ad5ed5d9b34b8020e12a27a23bab7c72.zip | |
Initial checkin of org.eclipse.ecf.provider plugin
43 files changed, 4492 insertions, 0 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/.classpath b/framework/bundles/org.eclipse.ecf.provider/.classpath new file mode 100644 index 000000000..065ac06e1 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/.classpath @@ -0,0 +1,7 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" path="src"/> + <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> + <classpathentry kind="output" path="bin"/> +</classpath> diff --git a/framework/bundles/org.eclipse.ecf.provider/.cvsignore b/framework/bundles/org.eclipse.ecf.provider/.cvsignore new file mode 100644 index 000000000..ba077a403 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/.cvsignore @@ -0,0 +1 @@ +bin diff --git a/framework/bundles/org.eclipse.ecf.provider/.project b/framework/bundles/org.eclipse.ecf.provider/.project new file mode 100644 index 000000000..0a235b6a0 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/.project @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>org.eclipse.ecf.provider</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.ManifestBuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.SchemaBuilder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.pde.PluginNature</nature> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription> diff --git a/framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF b/framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF new file mode 100644 index 000000000..532345de5 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/META-INF/MANIFEST.MF @@ -0,0 +1,24 @@ +Manifest-Version: 1.0 +Bundle-Name: Eclipse Communications Framework Provider Plug-in +Bundle-SymbolicName: org.eclipse.ecf.provider; singleton=true +Bundle-Version: 0.0.1 +Bundle-ClassPath: provider.jar +Bundle-Activator: org.eclipse.ecf.provider.ProviderPlugin +Bundle-Vendor: eclipse.org +Bundle-Localization: plugin +Require-Bundle: org.eclipse.core.runtime, + org.eclipse.ecf +Eclipse-AutoStart: true +DynamicImport-Package: * +Provide-Package: org.eclipse.ecf.provider, +org.eclipse.ecf.provider.comm.tcp +org.eclipse.ecf.provider.generic, +org.eclipse.ecf.provider.generic.events +org.eclipse.ecf.provider.generic.gmm +Bundle-SymbolicName: org.eclipse.ecf.provider +Bundle-Version: 0.0.1 +Bundle-Name: ECF - Provider Plugin +Bundle-Vendor: Eclipse.org +Bundle-Activator: org.eclipse.ecf.provider.ProviderPlugin + + diff --git a/framework/bundles/org.eclipse.ecf.provider/build.properties b/framework/bundles/org.eclipse.ecf.provider/build.properties new file mode 100644 index 000000000..4955ccc98 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/build.properties @@ -0,0 +1,4 @@ +source.provider.jar = src/ +output.provider.jar = bin/ +bin.includes = plugin.xml,\ + provider.jar diff --git a/framework/bundles/org.eclipse.ecf.provider/plugin.xml b/framework/bundles/org.eclipse.ecf.provider/plugin.xml new file mode 100644 index 000000000..1b3e5ea30 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/plugin.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?eclipse version="3.0"?> +<plugin + id="org.eclipse.ecf.provider" + name="ECF Provider Plug-in" + version="0.0.1" + provider-name="eclipse.org" + class="org.eclipse.ecf.provider.ProviderPlugin"> + + <runtime> + <library name="provider.jar"> + <export name="*"/> + </library> + </runtime> + + <requires> + <import plugin="org.eclipse.core.runtime"/> + <import plugin="org.eclipse.ecf"/> + </requires> + <extension + point="org.eclipse.ecf.containerFactory"> + <containerFactory + class="org.eclipse.ecf.provider.generic.ContainerInstantiator" + description="Generic container implementation (server and client)" + name="generic"/> + </extension> + <extension + point="org.eclipse.ecf.comm"> + <connection + description="TCP Client Connection" + name="tcpclient" + instantiatorClass="org.eclipse.ecf.provider.comm.tcp.Client$Creator"/> + </extension> + +</plugin> diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java new file mode 100644 index 000000000..497eed1d3 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Debug.java @@ -0,0 +1,28 @@ +package org.eclipse.ecf.provider; + +public class Debug { + + public static boolean ON = false; + + public static Debug create(String key) { + return new Debug(key); + } + + public static void errDumpStack(Throwable e, String msg) { + System.err.println(msg); + e.printStackTrace(System.err); + } + public void dumpStack(Throwable e, String msg) { + System.err.println(msg); + e.printStackTrace(System.err); + } + public void msg(String msg) { + System.err.println(msg); + } + protected Debug(String str) { + } + public static void setThreadDebugGroup(Object obj) { + // Do nothing + } + +} diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java new file mode 100644 index 000000000..ad41dba1f --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/ProviderPlugin.java @@ -0,0 +1,69 @@ +package org.eclipse.ecf.provider; + +import org.eclipse.core.runtime.Plugin; +import org.osgi.framework.BundleContext; +import java.util.*; + +/** + * The main plugin class to be used in the desktop. + */ +public class ProviderPlugin extends Plugin { + //The shared instance. + private static ProviderPlugin plugin; + //Resource bundle. + private ResourceBundle resourceBundle; + + /** + * The constructor. + */ + public ProviderPlugin() { + super(); + plugin = this; + try { + resourceBundle = ResourceBundle.getBundle("org.eclipse.ecf.provider.ProviderPluginResources"); + } catch (MissingResourceException x) { + resourceBundle = null; + } + } + + /** + * This method is called upon plug-in activation + */ + public void start(BundleContext context) throws Exception { + super.start(context); + } + + /** + * This method is called when the plug-in is stopped + */ + public void stop(BundleContext context) throws Exception { + super.stop(context); + } + + /** + * Returns the shared instance. + */ + public static ProviderPlugin getDefault() { + return plugin; + } + + /** + * Returns the string from the plugin's resource bundle, + * or 'key' if not found. + */ + public static String getResourceString(String key) { + ResourceBundle bundle = ProviderPlugin.getDefault().getResourceBundle(); + try { + return (bundle != null) ? bundle.getString(key) : key; + } catch (MissingResourceException e) { + return key; + } + } + + /** + * Returns the plugin's resource bundle, + */ + public ResourceBundle getResourceBundle() { + return resourceBundle; + } +} diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java new file mode 100644 index 000000000..b7d20717c --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java @@ -0,0 +1,18 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.Serializable; + +public class AsynchMessage implements Serializable { + + Serializable data; + + protected AsynchMessage() { + } + + protected AsynchMessage(Serializable data) { + this.data = data; + } + Serializable getData() { + return data; + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java new file mode 100644 index 000000000..8ad25edf7 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java @@ -0,0 +1,563 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.net.ConnectException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Enumeration; +import java.util.Map; +import java.util.Properties; +import java.util.Vector; + +import org.eclipse.ecf.core.comm.AsynchConnectionEvent; +import org.eclipse.ecf.core.comm.ConnectionEvent; +import org.eclipse.ecf.core.comm.ConnectionInstantiationException; +import org.eclipse.ecf.core.comm.DisconnectConnectionEvent; +import org.eclipse.ecf.core.comm.IConnectionEventHandler; +import org.eclipse.ecf.core.comm.ISynchAsynchConnection; +import org.eclipse.ecf.core.comm.ISynchAsynchConnectionEventHandler; +import org.eclipse.ecf.core.comm.SynchConnectionEvent; +import org.eclipse.ecf.core.comm.provider.ISynchAsynchConnectionInstantiator; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.identity.IDFactory; +import org.eclipse.ecf.core.util.SimpleQueueImpl; +import org.eclipse.ecf.provider.Debug; + +public final class Client implements ISynchAsynchConnection { + + public static class Creator implements ISynchAsynchConnectionInstantiator { + public ISynchAsynchConnection makeInstance(ISynchAsynchConnectionEventHandler handler, Class[] clazzes, + Object[] args) throws ConnectionInstantiationException { + try { + Integer ka = new Integer(0); + if (args.length > 0) { + ka = (Integer) args[0]; + } + return new Client(handler, + ka); + } catch (RuntimeException e) { + throw new ConnectionInstantiationException( + "Exception in creating connection "+Client.class.getName(), e); + } + } + + } + public static final String PROTOCOL = "ecftcp"; + + public static final Debug debug = Debug.create(Client.class.getName()); + + public static final int SNDR_PRIORITY = Thread.NORM_PRIORITY; + public static final int RCVR_PRIORITY = Thread.NORM_PRIORITY; + // Default close timeout is 1.5 seconds + public static final long CLOSE_TIMEOUT = 1500; + + public static final int DEF_MAX_MSG = 50; + + protected String address; + protected int port; + protected Socket socket; + protected ObjectOutputStream outputStream; + protected ObjectInputStream inputStream; + + protected ISynchAsynchConnectionEventHandler handler; + protected SimpleQueueImpl queue = new SimpleQueueImpl(); + protected int keepAlive = 0; + + protected Thread sendThread; + protected Thread rcvThread; + protected Thread keepAliveThread; + + protected boolean isClosing = false; + protected boolean waitForPing = false; + protected PingMessage ping = new PingMessage(); + protected PingResponseMessage pingResp = new PingResponseMessage(); + protected long nextPingTime; + + protected int maxMsg = DEF_MAX_MSG; + protected long closeTimeout = CLOSE_TIMEOUT; + + protected Vector eventNotify = null; + + protected Map properties; + + public Map getProperties() { + return properties; + } + public void setProperties(Map props) { + this.properties = props; + } + public Client(Socket aSocket, ObjectInputStream iStream, + ObjectOutputStream oStream, + ISynchAsynchConnectionEventHandler handler, int keepAlive) + throws IOException { + this(aSocket, iStream, oStream, handler, keepAlive, DEF_MAX_MSG); + } + public Client(Socket aSocket, ObjectInputStream iStream, + ObjectOutputStream oStream, + ISynchAsynchConnectionEventHandler handler, int keepAlive, + int maxmsgs) throws IOException { + socket = aSocket; + // Set TCP no delay + socket.setTcpNoDelay(true); + address = socket.getInetAddress().getHostName(); + port = socket.getPort(); + inputStream = iStream; + outputStream = oStream; + this.handler = handler; + this.keepAlive = keepAlive; + maxMsg = maxmsgs; + properties = new Properties(); + setupThreads(); + } + public Client(ISynchAsynchConnectionEventHandler handler, Integer keepAlive) { + this(handler, keepAlive.intValue()); + } + public Client(ISynchAsynchConnectionEventHandler handler, int keepAlive) { + this(handler, keepAlive, DEF_MAX_MSG); + } + public Client(ISynchAsynchConnectionEventHandler handler, int keepAlive, + int maxmsgs) { + this.handler = handler; + this.keepAlive = keepAlive; + maxMsg = maxmsgs; + this.properties = new Properties(); + } + + public synchronized ID getLocalID() { + if (socket == null) + return null; + ID retID = null; + try { + retID = IDFactory.makeStringID(PROTOCOL + "://" + + socket.getLocalAddress().getHostName() + ":" + port); + } catch (Exception e) { + return null; + } + return retID; + } + public synchronized void removeCommEventListener(IConnectionEventHandler l) { + eventNotify.remove(l); + } + public synchronized void addCommEventListener(IConnectionEventHandler l) { + if (eventNotify == null) { + eventNotify = new Vector(); + } + eventNotify.add(l); + } + public synchronized boolean isConnected() { + if (socket != null) { + return socket.isConnected(); + } + return false; + } + public synchronized boolean isStarted() { + if (sendThread != null) { + return sendThread.isAlive(); + } + return false; + } + + protected void fireSuspect(Exception e) { + Vector v = null; + synchronized (this) { + if (eventNotify == null) + return; + v = eventNotify; + } + for (Enumeration enum = v.elements(); enum.hasMoreElements();) { + IConnectionEventHandler h = (IConnectionEventHandler) enum + .nextElement(); + h.handleSuspectEvent(new ConnectionEvent(this, e)); + } + } + + public synchronized Object connect(ID remote, Object data, int timeout) + throws IOException { + if (socket != null) + throw new ConnectException("Client already connected"); + + URI anURI = null; + try { + anURI = remote.toURI(); + } catch (URISyntaxException e) { + throw new IOException("Can't connect to address " + + remote.getName() + ". Invalid URL"); + } + + address = anURI.getHost(); + port = anURI.getPort(); + + SocketFactory fact = SocketFactory.getSocketFactory(); + if (fact == null) { + fact = SocketFactory.getDefaultSocketFactory(); + } + if (Debug.ON && debug != null) { + debug.msg("Connecting to " + address + ":" + port); + } + // Actually connect to remote using socket from socket factory. + socket = fact.createSocket(address, port, timeout); + // Set TCP no delay + socket.setTcpNoDelay(true); + boolean compatibility = false; + //boolean compatibility = TCPCompatibility.useCompatibility; + outputStream = new ExObjectOutputStream(new BufferedOutputStream(socket + .getOutputStream()), compatibility); + outputStream.flush(); + inputStream = new ExObjectInputStream(socket.getInputStream(), + compatibility); + // send connect data + sendIt(new ConnectRequestMessage(anURI, (Serializable) data)); + + ConnectResultMessage res = null; + res = (ConnectResultMessage) readObject(); + // Setup threads + setupThreads(); + // Return results. + return res.getData(); + } + + private void setupThreads() { + // Setup threads + sendThread = (Thread) AccessController + .doPrivileged(new PrivilegedAction() { + public Object run() { + return getSendThread(); + } + }); + rcvThread = (Thread) AccessController + .doPrivileged(new PrivilegedAction() { + public Object run() { + return getRcvThread(); + } + }); + } + + private Thread getSendThread() { + Thread aThread = new Thread(new Runnable() { + public void run() { + int msgCount = 0; + Thread me = Thread.currentThread(); + // Loop until done sending messages + for (;;) { + if (me.isInterrupted()) + break; + Serializable aMsg = (Serializable) queue.peekQueue(); + if (me.isInterrupted() || aMsg == null) + break; + try { + // Actually send message + sendIt(aMsg); + // Successful...remove message from queue + queue.removeHead(); + + if (msgCount > maxMsg) { + synchronized (outputStream) { + outputStream.reset(); + } + msgCount = 0; + } else + msgCount++; + } catch (IOException e) { + // Log to stderr + Debug.errDumpStack(e, "Exception in sender thread for " + + address + ":" + port); + + if (isClosing) { + isClosing = false; + synchronized (Client.this) { + Client.this.notifyAll(); + } + } else { + if (handler.handleSuspectEvent(new ConnectionEvent( + Client.this, e))) { + handler + .handleDisconnectEvent(new DisconnectConnectionEvent( + Client.this, e, queue)); + } + } + break; + } + } + if (Debug.ON && debug != null) { + debug.msg("Sndr for " + address + ":" + port + + " terminating."); + } + } + }, "Sndr for " + address + ":" + port); + // Set priority for new thread + aThread.setPriority(SNDR_PRIORITY); + return aThread; + } + + private void closeSocket() { + // Close socket + try { + if (socket != null) + socket.close(); + } catch (IOException e) { + } + } + + private void sendIt(Serializable snd) throws IOException { + // Write object to output stream + synchronized (outputStream) { + outputStream.writeObject(snd); + outputStream.flush(); + nextPingTime = System.currentTimeMillis() + keepAlive; + } + } + + private void receiveResp() { + synchronized (outputStream) { + waitForPing = false; + nextPingTime = System.currentTimeMillis() + keepAlive; + outputStream.notifyAll(); + } + } + + public void setCloseTimeout(long t) { + closeTimeout = t; + } + + private void sendClose(Serializable snd) throws IOException { + isClosing = true; + sendIt(snd); + if (isClosing) { + try { + wait(closeTimeout); + } catch (Exception e) { + } + } + // Before returning, actually remove remote objects + handler.handleDisconnectEvent(new DisconnectConnectionEvent( + Client.this, null, queue)); + } + + private Thread getRcvThread() { + Thread aThread = new Thread(new Runnable() { + public void run() { + Thread me = Thread.currentThread(); + // Loop forever and handle objects received. + for (;;) { + if (me.isInterrupted()) + break; + try { + handleRcv(readObject()); + } catch (IOException e) { + // Log to stderr + Debug.errDumpStack(e, "Exception in read thread for " + + address + ":" + port); + + if (Debug.ON && debug != null) { + debug.dumpStack(e, "Exception in read thread for " + + address + ":" + port + ": " + + e.getMessage()); + } + if (isClosing) { + isClosing = false; + synchronized (Client.this) { + Client.this.notifyAll(); + } + } else { + if (handler.handleSuspectEvent(new ConnectionEvent( + Client.this, e))) { + handler + .handleDisconnectEvent(new DisconnectConnectionEvent( + Client.this, e, queue)); + } + } + break; + } + } + if (Debug.ON && debug != null) { + debug.msg("Rcvr for " + address + ":" + port + + " terminating."); + } + } + }, "Rcvr for " + address + ":" + port); + // Set priority and return + aThread.setPriority(RCVR_PRIORITY); + return aThread; + } + + private void handleRcv(Serializable rcv) throws IOException { + try { + // We've received some data, so the connection is alive + receiveResp(); + // Handle all messages + if (rcv instanceof SynchMessage) { + // Handle synch message. The only valid synch message is + // 'close'. + handler.handleSynchEvent(new SynchConnectionEvent(this, + ((SynchMessage) rcv).getData())); + } else if (rcv instanceof AsynchMessage) { + Serializable d = ((AsynchMessage) rcv).getData(); + // Handle asynch messages. + handler.handleAsynchEvent(new AsynchConnectionEvent(this, d)); + } else if (rcv instanceof PingMessage) { + // Handle ping by sending response back immediately + sendIt(pingResp); + } else if (rcv instanceof PingResponseMessage) { + // Do nothing with ping response + } else + throw new IOException("Invalid packet received."); + } catch (IOException e) { + disconnect(); + throw e; + } + } + + public synchronized void start() { + if (sendThread != null) + sendThread.start(); + if (rcvThread != null) + rcvThread.start(); + // Setup and start keep alive thread + if (keepAlive != 0) + keepAliveThread = setupPing(); + if (keepAliveThread != null) + keepAliveThread.start(); + } + public void stop() { + } + private Thread setupPing() { + return new Thread(new Runnable() { + public void run() { + Thread me = Thread.currentThread(); + while (!queue.isStopped()) { + try { + if (me.isInterrupted()) + break; + // Sleep for timeout interval divided by two + Thread.sleep(keepAlive / 2); + if (me.isInterrupted()) + break; + // Check to see how long it has been since our last + // send. + synchronized (outputStream) { + if (System.currentTimeMillis() >= nextPingTime) { + // If it's been longer than our timeout + // interval, then ping + waitForPing = true; + // Actually send ping instance + sendIt(ping); + if (waitForPing) { + try { + // Wait for keepAliveInterval for + // pingresp + outputStream.wait(keepAlive / 2); + } catch (InterruptedException e) { + } + } + if (waitForPing) { + throw new IOException(address + ":" + port + + " not reachable."); + } + } + } + } catch (Exception e) { + // Log to stderr + Debug.errDumpStack(e, "Exception in ping thread for " + + address + ":" + port); + + if (Debug.ON && debug != null) { + debug.dumpStack(e, "Exception in ping."); + } + if (isClosing) { + isClosing = false; + synchronized (Client.this) { + Client.this.notifyAll(); + } + } else { + if (handler.handleSuspectEvent(new ConnectionEvent( + Client.this, e))) { + handler + .handleDisconnectEvent(new DisconnectConnectionEvent( + Client.this, e, queue)); + } + } + break; + } + } + if (Debug.ON && debug != null) { + debug.msg("Keepalive terminating."); + } + } + }, "Keepalive " + address + ":" + port); + } + + public synchronized void disconnect() throws IOException { + if (Debug.ON && debug != null) { + debug.msg("disconnect()"); + } + // Close send queue and socket + queue.close(); + closeSocket(); + // Notify sender in case it's waiting for a response + // Zap keep alive thread + if (keepAliveThread != null) { + keepAliveThread.interrupt(); + keepAliveThread = null; + } + if (sendThread != null) { + sendThread.interrupt(); + sendThread = null; + } + if (rcvThread != null) { + rcvThread.interrupt(); + rcvThread = null; + } + // Notify any threads waiting to get hold of our lock + notifyAll(); + } + + public void sendAsynch(ID recipient, byte[] obj) throws IOException { + queueObject(recipient, obj); + } + public void sendAsynch(ID recipien, Object obj) throws IOException { + queueObject(recipien, (Serializable) obj); + } + public synchronized void queueObject(ID recipient, Serializable obj) + throws IOException { + if (queue.isStopped() || isClosing) + throw new ConnectException("Not connected"); + queue.enqueue(new AsynchMessage(obj)); + } + public synchronized Serializable sendObject(ID recipient, Serializable obj) + throws IOException { + if (queue.isStopped() || isClosing) + throw new ConnectException("Not connected"); + sendClose(new SynchMessage(obj)); + return null; + } + + public Object sendSynch(ID rec, Object obj) throws IOException { + return sendObject(rec, (Serializable) obj); + } + + public Object sendSynch(ID rec, byte[] obj) throws IOException { + return sendObject(rec, obj); + } + private Serializable readObject() throws IOException { + Serializable ret = null; + try { + ret = (Serializable) inputStream.readObject(); + } catch (ClassNotFoundException e) { + if (Debug.ON && debug != null) { + debug.dumpStack(e, "Class not found exception"); + } + throw new IOException( + "Protocol violation due to class load failure. " + + e.getMessage()); + } + return ret; + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java new file mode 100644 index 000000000..4b3b548a1 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java @@ -0,0 +1,27 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.Serializable; +import java.net.URI; + +public class ConnectRequestMessage implements Serializable { + + URI target; + Serializable data; + + public ConnectRequestMessage(URI target, Serializable data) { + this.target = target; + this.data = data; + } + + public URI getTarget() { + return target; + } + + public Serializable getData() { + return data; + } + + public String toString() { + return "ConnectRequestMessage[target:" + target + ";data:" + data + "]"; + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java new file mode 100644 index 000000000..56ee36524 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java @@ -0,0 +1,16 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.Serializable; + +public class ConnectResultMessage implements Serializable { + + Serializable data; + + public ConnectResultMessage(Serializable data) { + this.data = data; + } + + public Serializable getData() { + return data; + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java new file mode 100644 index 000000000..4f6853a59 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java @@ -0,0 +1,54 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; + +import org.eclipse.ecf.provider.Debug; + +public class ExObjectInputStream extends ObjectInputStream { + + private boolean replace = false; + + public static final Debug debug = Debug.create(ExObjectInputStream.class + .getName()); + + public ExObjectInputStream(InputStream in) throws IOException, + SecurityException { + super(in); + if (Debug.ON && debug != null) { + debug.msg("ExObjectInputStream(" + in + ")"); + } + } + + public ExObjectInputStream(InputStream in, boolean backwardCompatibility) + throws IOException, SecurityException { + super(in); + if (Debug.ON && debug != null) { + debug.msg("ExObjectInputStream(" + in + "," + backwardCompatibility + + ")"); + } + if (backwardCompatibility) { + try { + super.enableResolveObject(true); + replace = true; + debug("ExObjectInputStream.compatibility set"); + } catch (Exception e) { + throw new IOException( + "Could not setup backward compatibility object replacers for ExObjectInputStream"); + } + } + } + + protected void debug(String msg) { + if (Debug.ON && debug != null) { + debug.msg(msg); + } + } + protected void debug(String msg, Throwable t) { + if (Debug.ON && debug != null) { + debug.dumpStack(t, msg); + } + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java new file mode 100644 index 000000000..ffb02010c --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java @@ -0,0 +1,45 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStream; + +import org.eclipse.ecf.provider.Debug; + +public class ExObjectOutputStream extends ObjectOutputStream { + + private boolean replace = false; + + public static final Debug debug = Debug.create(ExObjectOutputStream.class + .getName()); + + public ExObjectOutputStream(OutputStream out) throws IOException { + super(out); + debug("ExObjectOutputStream(" + out + ")"); + } + + public ExObjectOutputStream(OutputStream out, boolean backwardCompatibility) + throws IOException, SecurityException { + this(out); + if (backwardCompatibility) { + try { + super.enableReplaceObject(true); + replace = true; + debug("ExObjectOutputStream.compatibility set"); + } catch (Exception e) { + throw new IOException( + "Could not setup backward compatibility object replacers for ExObjectOutputStream"); + } + } + } + protected void debug(String msg) { + if (Debug.ON && debug != null) { + debug.msg(msg); + } + } + protected void debug(String msg, Throwable t) { + if (Debug.ON && debug != null) { + debug.dumpStack(t, msg); + } + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java new file mode 100644 index 000000000..46e775ee7 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IClientSocketFactory.java @@ -0,0 +1,8 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.IOException; +import java.net.Socket; + +public interface IClientSocketFactory { + Socket createSocket(String name, int port, int timeout) throws IOException; +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java new file mode 100644 index 000000000..ab6a443f8 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/IServerSocketFactory.java @@ -0,0 +1,9 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.IOException; +import java.net.ServerSocket; + +public interface IServerSocketFactory { + ServerSocket createServerSocket(int port, int backlog) throws IOException; +} + diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java new file mode 100644 index 000000000..74a5f4eea --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ISocketAcceptHandler.java @@ -0,0 +1,7 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.net.Socket; + +public interface ISocketAcceptHandler { + public void handleAccept(Socket aSocket) throws Exception; +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java new file mode 100644 index 000000000..f17b7ebd2 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java @@ -0,0 +1,8 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.Serializable; + +public class PingMessage implements Serializable { + protected PingMessage() { + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java new file mode 100644 index 000000000..d262b1fb3 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java @@ -0,0 +1,8 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.Serializable; + +public class PingResponseMessage implements Serializable { + protected PingResponseMessage() { + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java new file mode 100644 index 000000000..694cfabcb --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java @@ -0,0 +1,93 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +import org.eclipse.ecf.provider.Debug; + +public class Server extends ServerSocket { + public static Debug debug = Debug.create(Server.class.getName()); + + ISocketAcceptHandler acceptHandler; + + Thread listenerThread; + ThreadGroup threadGroup; + + public Server(ThreadGroup group, int port, ISocketAcceptHandler handler) + throws IOException { + super(port); + if (handler == null) + throw new InstantiationError("Listener cannot be null"); + acceptHandler = handler; + threadGroup = group; + listenerThread = setupListener(); + listenerThread.start(); + } + + public Server(int port, ISocketAcceptHandler handler) throws IOException { + this(null, port, handler); + } + + protected Thread setupListener() { + return new Thread(threadGroup, new Runnable() { + public void run() { + while (true) { + try { + handleAccept(accept()); + } catch (Exception e) { + if (Debug.ON && debug != null) { + debug.dumpStack(e, "Exception in accept"); + } + // If we get an exception on accept(), we should just + // exit + break; + } + } + if (Debug.ON && debug != null) { + debug.msg("Closing listener normally."); + } + } + }, "Server(" + getLocalPort() + ")"); + } + + protected void handleAccept(final Socket aSocket) { + new Thread(threadGroup, new Runnable() { + public void run() { + try { + acceptHandler.handleAccept(aSocket); + } catch (Exception e) { + if (Debug.ON && debug != null) { + debug.dumpStack(e, + "Unexplained exception in connect. Closing."); + } + try { + aSocket.close(); + } catch (IOException e1) { + } + ; + } finally { + if (Debug.ON && debug != null) { + debug.msg("handleAcceptAsych terminating."); + } + } + } + }).start(); + } + + public synchronized void close() throws IOException { + if (Debug.ON && debug != null) { + debug.msg("close()"); + } + super.close(); + if (listenerThread != null) { + listenerThread.interrupt(); + listenerThread = null; + } + if (threadGroup != null) { + threadGroup.interrupt(); + threadGroup = null; + } + acceptHandler = null; + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java new file mode 100644 index 000000000..6e7f437ce --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SocketFactory.java @@ -0,0 +1,50 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +public class SocketFactory implements IClientSocketFactory, + IServerSocketFactory { + + protected static SocketFactory defaultFactory; + protected static SocketFactory factory = null; + + public Socket createSocket(String name, int port, int timeout) + throws IOException { + if (factory != null) { + return factory.createSocket(name, port, timeout); + } else { + Socket s = new Socket(); + s.connect(new InetSocketAddress(name, port), timeout); + return s; + } + } + + public ServerSocket createServerSocket(int port, int backlog) + throws IOException { + if (factory != null) { + return factory.createServerSocket(port, backlog); + } else + return new ServerSocket(port, backlog); + } + + public static synchronized SocketFactory getSocketFactory() { + return factory; + } + + public synchronized static SocketFactory getDefaultSocketFactory() { + if (defaultFactory == null) { + defaultFactory = new SocketFactory(); + } + return defaultFactory; + } + + public synchronized static void setSocketFactory(SocketFactory fact) { + if (!fact.equals(defaultFactory)) { + factory = fact; + } + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java new file mode 100644 index 000000000..e7c6d8872 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java @@ -0,0 +1,13 @@ +package org.eclipse.ecf.provider.comm.tcp; + +import java.io.Serializable; + +public class SynchMessage extends AsynchMessage { + + protected SynchMessage(Serializable data) { + super(data); + } + protected SynchMessage() { + super(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java new file mode 100644 index 000000000..679f5d375 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java @@ -0,0 +1,288 @@ +package org.eclipse.ecf.provider.generic; + +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.Serializable; +import java.net.ConnectException; + +import org.eclipse.ecf.core.ISharedObjectContainerConfig; +import org.eclipse.ecf.core.SharedObjectContainerJoinException; +import org.eclipse.ecf.core.SharedObjectDescription; +import org.eclipse.ecf.core.comm.AsynchConnectionEvent; +import org.eclipse.ecf.core.comm.ConnectionInstantiationException; +import org.eclipse.ecf.core.comm.DisconnectConnectionEvent; +import org.eclipse.ecf.core.comm.IAsynchConnection; +import org.eclipse.ecf.core.comm.IConnection; +import org.eclipse.ecf.core.comm.ISynchAsynchConnection; +import org.eclipse.ecf.core.comm.SynchConnectionEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.provider.generic.gmm.Member; + +public abstract class ClientSOContainer extends SOContainer { + ISynchAsynchConnection connection; + ID remoteServerID; + byte connectionState; + public static final byte UNCONNECTED = 0; + public static final byte CONNECTING = 1; + public static final byte CONNECTED = 2; + static final class Lock { + } + Lock connectLock; + + public ClientSOContainer(ISharedObjectContainerConfig config) { + super(config); + connection = null; + connectionState = UNCONNECTED; + connectLock = new Lock(); + } + public final boolean isGroupServer() { + return false; + } + public final boolean isGroupManager() { + return false; + } + public ID getGroupID() { + return remoteServerID; + } + public void joinGroup(ID remoteSpace, Object data) + throws SharedObjectContainerJoinException { + try { + if (isClosing) + throw new IllegalStateException("container is closing"); + ISynchAsynchConnection aConnection = getClientConnection( + remoteSpace, data); + if (aConnection == null) { + ConnectException c = new ConnectException("join failed to" + + ":" + remoteSpace.getName()); + throw c; + } + ContainerMessage response; + synchronized (connectLock) { + if (isConnected()) { + killConnection(aConnection); + aConnection = null; + ConnectException c = new ConnectException( + "already connected to " + getGroupID()); + throw c; + } + if (isConnecting()) { + killConnection(aConnection); + aConnection = null; + ConnectException c = new ConnectException( + "currently connecting"); + throw c; + } + connectionState = CONNECTING; + connection = aConnection; + } + synchronized (aConnection) { + try { + Object connectData = getConnectData(remoteSpace, data); + response = (ContainerMessage) aConnection.connect( + remoteSpace, connectData, 0); + } catch (IOException e) { + synchronized (connectLock) { + killConnection(aConnection); + if (connection != aConnection) { + aConnection = null; + throw e; + } + connectionState = UNCONNECTED; + connection = null; + remoteServerID = null; + } + throw e; + } + synchronized (connectLock) { + // If not in correct state, disconnect and return + if (connection != aConnection) { + killConnection(aConnection); + aConnection = null; + ConnectException c = new ConnectException( + "join failed because not in correct state"); + throw c; + } + ID serverID = null; + try { + serverID = acceptNewServer(response); + } catch (Exception e) { + killConnection(aConnection); + aConnection = null; + connection = null; + remoteServerID = null; + connectionState = UNCONNECTED; + ConnectException c = new ConnectException( + "join refused locally via acceptNewServer"); + throw c; + } + aConnection.start(); + remoteServerID = serverID; + connectionState = CONNECTED; + } + } + } catch (Exception e) { + throw new SharedObjectContainerJoinException("could not join", e); + } + } + + protected void forwardExcluding(ID from, ID excluding, ContainerMessage data) + throws IOException { + // NOP + } + protected Serializable getConnectData(ID target, Object data) { + return ContainerMessage.makeJoinGroupMessage(getID(), target, + getNextSequenceNumber(), (Serializable) data); + } + protected Serializable getLeaveData(ID target) { + return null; + } + public void leaveGroup() { + synchronized (connectLock) { + // If we are currently connected + if (isConnected()) { + synchronized (connection) { + try { + connection.sendSynch(remoteServerID, + getBytesForObject(ContainerMessage + .makeLeaveGroupMessage(getID(), + remoteServerID, + getNextSequenceNumber(), + getLeaveData(remoteServerID)))); + } catch (Exception e) { + } + synchronized (getGroupMembershipLock()) { + memberLeave(remoteServerID, connection); + } + } + } + connectionState = UNCONNECTED; + connection = null; + remoteServerID = null; + } + } + + protected abstract ISynchAsynchConnection getClientConnection( + ID remoteSpace, Object data) throws ConnectionInstantiationException; + + protected void handleChangeMsg(ID fromID, ID toID, long seqNum, + Serializable data) throws IOException { + ContainerMessage.ViewChangeMessage c = null; + // Check data in packge for validity + ID ids[] = null; + try { + c = (ContainerMessage.ViewChangeMessage) data; + if (fromID == null || c == null) + throw new Exception(); + ids = c.changeIDs; + if (ids == null || ids[0] == null || !fromID.equals(remoteServerID)) + throw new IOException(); + } catch (Exception e) { + InvalidObjectException t = new InvalidObjectException("bad data" + + ":" + fromID + ":" + toID + ":" + seqNum); + throw t; + } + // Now actually add/remove member + Member m = new Member(ids[0]); + synchronized (getGroupMembershipLock()) { + if (c.add) { + groupManager.addMember(m); + } else + groupManager.removeMember(m); + } + } + + protected void queueContainerMessage(ContainerMessage message) + throws IOException { + // Do it + connection.sendAsynch(message.getToContainerID(), + getBytesForObject(message)); + } + protected void forwardExcluding(ID from, ID excluding, byte msg, + Serializable data) throws IOException { /* NOP */ + } + protected void forwardToRemote(ID from, ID to, ContainerMessage message) + throws IOException { /* NOP */ + } + protected ID getIDForConnection(IAsynchConnection conn) { + return remoteServerID; + } + protected void memberLeave(ID fromID, IAsynchConnection conn) { + if (fromID.equals(remoteServerID)) { + groupManager.removeAllMembers(); + super.memberLeave(fromID, conn); + connectionState = UNCONNECTED; + connection = null; + remoteServerID = null; + } else if (fromID.equals(getID())) { + super.memberLeave(fromID, conn); + } + } + protected void sendMessage(ContainerMessage data) throws IOException { + // Get connect lock, then call super version + synchronized (connectLock) { + checkConnected(); + super.sendMessage(data); + } + } + protected ID[] sendCreateMsg(ID toID, SharedObjectDescription createInfo) + throws IOException { + // Get connect lock, then call super version + synchronized (connectLock) { + checkConnected(); + return super.sendCreateSharedObjectMessage(toID, createInfo); + } + } + protected void processDisconnect(DisconnectConnectionEvent evt) { + // Get connect lock, and just return if this connection has been + // terminated + synchronized (connectLock) { + super.processDisconnect(evt); + } + } + protected void processAsynchPacket(AsynchConnectionEvent evt) + throws IOException { + // Get connect lock, then call super version + synchronized (connectLock) { + checkConnected(); + super.processAsynch(evt); + } + } + protected Serializable processSynch(SynchConnectionEvent evt) + throws IOException { + synchronized (connectLock) { + checkConnected(); + IConnection conn = evt.getConnection(); + if (connection != conn) + throw new ConnectException("not connected"); + return super.processSynch(evt); + } + } + + protected boolean isConnected() { + return (connectionState == CONNECTED); + } + protected boolean isConnecting() { + return (connectionState == CONNECTING); + } + private void checkConnected() throws ConnectException { + if (!isConnected()) + throw new ConnectException("not connected"); + } + protected ID acceptNewServer(ContainerMessage serverData) throws Exception { + ContainerMessage aPacket = serverData; + ID fromID = aPacket.getFromContainerID(); + + if (fromID == null) + throw new InvalidObjectException("server id is null"); + + ID[] ids = ((ContainerMessage.ViewChangeMessage) aPacket.getData()).changeIDs; + if (ids == null) + throw new java.io.InvalidObjectException("id array null"); + for (int i = 0; i < ids.length; i++) { + ID id = ids[i]; + if (id != null && !id.equals(getID())) + addNewRemoteMember(id, null); + } + return fromID; + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java new file mode 100644 index 000000000..6759a236e --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerInstantiator.java @@ -0,0 +1,32 @@ +package org.eclipse.ecf.provider.generic; + +import org.eclipse.ecf.core.ISharedObjectContainer; +import org.eclipse.ecf.core.SharedObjectContainerInstantiationException; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.provider.ISharedObjectContainerInstantiator; + +public class ContainerInstantiator implements ISharedObjectContainerInstantiator { + + public ContainerInstantiator() { + super(); + } + + public ISharedObjectContainer makeInstance(Class[] argTypes, Object[] args) + throws SharedObjectContainerInstantiationException { + try { + Boolean isClient = null; + ID id = null; + isClient = (Boolean) args[0]; + id = (ID) args[1]; + ISharedObjectContainer result = null; + if (isClient.booleanValue()) { + return new TCPClientSOContainer(new SOContainerConfig(id)); + } else { + return new TCPServerSOContainer(new SOContainerConfig(id)); + } + } catch (Exception e) { + throw new SharedObjectContainerInstantiationException("Exception creating container",e); + } + } + +} diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java new file mode 100644 index 000000000..3728b67e0 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java @@ -0,0 +1,166 @@ +package org.eclipse.ecf.provider.generic; + +import java.io.Serializable; +import org.eclipse.ecf.core.identity.ID; + +public class ContainerMessage implements Serializable { + + ID fromContainerID; + public ID toContainerID; + long sequence; + Serializable data; + + /** + * @return Returns the data. + */ + public Serializable getData() { + return data; + } + /** + * @param data + * The data to set. + */ + public void setData(Serializable data) { + this.data = data; + } + /** + * @return Returns the fromContainerID. + */ + public ID getFromContainerID() { + return fromContainerID; + } + /** + * @param fromContainerID + * The fromContainerID to set. + */ + public void setFromContainerID(ID fromContainerID) { + this.fromContainerID = fromContainerID; + } + /** + * @return Returns the sequence. + */ + public long getSequence() { + return sequence; + } + /** + * @param sequence + * The sequence to set. + */ + public void setSequence(long sequence) { + this.sequence = sequence; + } + /** + * @return Returns the toContainerID. + */ + public ID getToContainerID() { + return toContainerID; + } + /** + * @param toContainerID + * The toContainerID to set. + */ + public void setToContainerID(ID toContainerID) { + this.toContainerID = toContainerID; + } + static ContainerMessage makeViewChangeMessage(ID from, ID to, long seq, + ID ids[], boolean add, Serializable data) { + return new ContainerMessage(from, to, seq, new ViewChangeMessage(ids, + add, data)); + } + static ContainerMessage makeJoinGroupMessage(ID from, ID to, long seq, + Serializable data) { + return new ContainerMessage(from, to, seq, new JoinGroupMessage(data)); + } + static ContainerMessage makeLeaveGroupMessage(ID from, ID to, long seq, + Serializable data) { + return new ContainerMessage(from, to, seq, new LeaveGroupMessage(data)); + } + static ContainerMessage makeSharedObjectCreateMessage(ID from, ID to, + long seq, Serializable data) { + return new ContainerMessage(from, to, seq, new CreateMessage(data)); + } + static ContainerMessage makeSharedObjectCreateResponseMessage(ID from, + ID to, long contSeq, ID soID, Throwable e, long sequence) { + return new ContainerMessage(from, to, contSeq, + new CreateResponseMessage(soID, e, sequence)); + } + static ContainerMessage makeSharedObjectMessage(ID from, ID to, long seq, + ID fromSharedObject, Serializable data) { + return new ContainerMessage(from, to, seq, new SharedObjectMessage( + fromSharedObject, data)); + } + static ContainerMessage makeSharedObjectDisposeMessage(ID from, ID to, + long seq, ID sharedObjectID) { + return new ContainerMessage(from, to, seq, + new SharedObjectDisposeMessage(sharedObjectID)); + } + protected ContainerMessage(ID from, ID to, long seq, Serializable data) { + this.fromContainerID = from; + this.toContainerID = to; + this.sequence = seq; + this.data = data; + } + + public static final class ViewChangeMessage implements Serializable { + ID changeIDs[]; + boolean add; + Serializable data; + ViewChangeMessage(ID id[], boolean a, Serializable data) { + this.changeIDs = id; + this.add = a; + this.data = data; + } + } + public static final class CreateMessage implements Serializable { + Serializable data; + CreateMessage(Serializable data) { + this.data = data; + } + } + public static final class CreateResponseMessage implements Serializable { + ID sharedObjectID; + Throwable exception; + long sequence; + public CreateResponseMessage(ID objID, Throwable except, long sequence) { + this.sharedObjectID = objID; + this.exception = except; + this.sequence = sequence; + } + } + public static final class SharedObjectMessage implements Serializable { + Serializable data; + ID fromSharedObjectID; + SharedObjectMessage(ID fromSharedObject, Serializable data) { + this.fromSharedObjectID = fromSharedObject; + this.data = data; + } + } + public static final class SharedObjectDisposeMessage implements + Serializable { + ID sharedObjectID; + SharedObjectDisposeMessage(ID objID) { + this.sharedObjectID = objID; + } + } + + public static final class JoinGroupMessage implements Serializable { + Serializable data; + + public JoinGroupMessage(Serializable data) { + this.data = data; + } + public Serializable getData() { + return data; + } + } + public static final class LeaveGroupMessage implements Serializable { + Serializable data; + + public LeaveGroupMessage(Serializable data) { + this.data = data; + } + public Serializable getData() { + return data; + } + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java new file mode 100644 index 000000000..8dceedd7c --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java @@ -0,0 +1,119 @@ +/* + * Created on Dec 6, 2004 + * + */ +package org.eclipse.ecf.provider.generic; + +import org.eclipse.ecf.core.util.EnqueuePredicate; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.core.util.QueueEnqueue; +import org.eclipse.ecf.core.util.QueueException; +import org.eclipse.ecf.core.util.SimpleQueueImpl; + +public class QueueEnqueueImpl implements QueueEnqueue { + + SimpleQueueImpl queue = null; + + public QueueEnqueueImpl(SimpleQueueImpl impl) { + super(); + this.queue = impl; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue(org.eclipse.ecf.core.util.Event) + */ + public void enqueue(Event element) throws QueueException { + queue.enqueue(element); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue(org.eclipse.ecf.core.util.Event[]) + */ + public void enqueue(Event[] elements) throws QueueException { + if (elements != null) { + for (int i = 0; i < elements.length; i++) { + enqueue(elements[i]); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_prepare(org.eclipse.ecf.core.util.Event[]) + */ + public Object enqueue_prepare(Event[] elements) throws QueueException { + return elements; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_commit(java.lang.Object) + */ + public void enqueue_commit(Object enqueue_key) { + if (enqueue_key instanceof Event[]) { + Event[] events = (Event[]) enqueue_key; + try { + enqueue(events); + } catch (QueueException e) { + // this should not happen + e.printStackTrace(System.err); + } + } + + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_abort(java.lang.Object) + */ + public void enqueue_abort(Object enqueue_key) { + // Do nothing + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#enqueue_lossy(org.eclipse.ecf.core.util.Event) + */ + public boolean enqueue_lossy(Event element) { + queue.enqueue(element); + return true; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#setEnqueuePredicate(org.eclipse.ecf.core.util.EnqueuePredicate) + */ + public void setEnqueuePredicate(EnqueuePredicate pred) { + // This queue does not support enqueue predicate + // So we do nothing + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#getEnqueuePredicate() + */ + public EnqueuePredicate getEnqueuePredicate() { + // We don't support enqueue predicate, so return null; + return null; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.util.QueueEnqueue#size() + */ + public int size() { + return queue.size(); + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java new file mode 100644 index 000000000..a4b48b7cf --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConfig.java @@ -0,0 +1,84 @@ +/* + * Created on Nov 29, 2004 + * + */ +package org.eclipse.ecf.provider.generic; + +import java.util.Map; + +import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.ISharedObjectContext; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.QueueEnqueue; + +public class SOConfig implements ISharedObjectConfig { + + SOContainer container = null; + ID sharedObjectID; + ID homeContainerID; + boolean isActive; + Map properties; + SOContext standAloneContext; + + public SOConfig(ID sharedObjectID, ID homeContainerID, SOContainer cont, + Map dict) { + super(); + this.sharedObjectID = sharedObjectID; + this.homeContainerID = homeContainerID; + isActive = false; + properties = dict; + this.container = cont; + } + + protected void makeActive(QueueEnqueue queue) { + isActive = true; + this.standAloneContext = new SOContext(sharedObjectID, homeContainerID, + container, properties, queue); + } + + protected void makeInactive() { + this.standAloneContext.makeInactive(); + this.standAloneContext = null; + isActive = false; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConfig#getSharedObjectID() + */ + public ID getSharedObjectID() { + return sharedObjectID; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConfig#getHomeContainerID() + */ + public ID getHomeContainerID() { + return homeContainerID; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConfig#getContext() + */ + public ISharedObjectContext getContext() { + if (isActive) { + return null; + } else + return null; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConfig#getProperties() + */ + public Map getProperties() { + return properties; + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java new file mode 100644 index 000000000..c5d78bf8f --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOConnector.java @@ -0,0 +1,116 @@ +/* + * Created on Dec 20, 2004 + * + */ +package org.eclipse.ecf.provider.generic; + +import java.util.Enumeration; +import java.util.Hashtable; + +import org.eclipse.ecf.core.ISharedObjectConnector; +import org.eclipse.ecf.core.events.ISharedObjectEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.AsynchResult; +import org.eclipse.ecf.core.util.QueueEnqueue; +import org.eclipse.ecf.core.util.QueueException; +import org.eclipse.ecf.provider.generic.events.SharedObjectCallEvent; + +public class SOConnector implements ISharedObjectConnector { + + ID sender; + Hashtable receiverQueues = null; + + public SOConnector(ID sender, ID[] recv, QueueEnqueue[] queues) { + super(); + this.receiverQueues = new Hashtable(); + for (int i = 0; i < recv.length; i++) { + receiverQueues.put(recv[i], queues[i]); + } + } + + protected void fireEvent(ISharedObjectEvent event) throws QueueException { + for (Enumeration e = receiverQueues.elements(); e.hasMoreElements();) { + QueueEnqueue queue = (QueueEnqueue) e.nextElement(); + queue.enqueue(event); + } + } + protected void fireEvents(ISharedObjectEvent[] event) throws QueueException { + for (Enumeration e = receiverQueues.elements(); e.hasMoreElements();) { + QueueEnqueue queue = (QueueEnqueue) e.nextElement(); + if (queue != null) { + queue.enqueue(event); + } + } + } + protected AsynchResult[] fireCallEvent(ISharedObjectEvent event) + throws QueueException { + AsynchResult[] results = new AsynchResult[receiverQueues.size()]; + int i = 0; + for (Enumeration e = receiverQueues.elements(); e.hasMoreElements();) { + QueueEnqueue queue = (QueueEnqueue) e.nextElement(); + results[i] = new AsynchResult(); + queue.enqueue(new SharedObjectCallEvent(event + .getSenderSharedObjectID(), event, results[i])); + } + return results; + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConnector#getSender() + */ + public ID getSender() { + return sender; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConnector#getReceivers() + */ + public ID[] getReceivers() { + return (ID[]) receiverQueues.keySet().toArray( + new ID[receiverQueues.size()]); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConnector#enqueue(org.eclipse.ecf.core.events.ISharedObjectEvent) + */ + public void enqueue(ISharedObjectEvent event) throws QueueException { + fireEvent(event); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConnector#enqueue(org.eclipse.ecf.core.events.ISharedObjectEvent[]) + */ + public void enqueue(ISharedObjectEvent[] events) throws QueueException { + fireEvents(events); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConnector#callAsynch(org.eclipse.ecf.core.events.ISharedObjectEvent) + */ + public AsynchResult[] callAsynch(ISharedObjectEvent arg) throws Exception { + return fireCallEvent(arg); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectConnector#dispose() + */ + public void dispose() { + if (receiverQueues != null) { + receiverQueues.clear(); + receiverQueues = null; + } + sender = null; + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java new file mode 100644 index 000000000..fe39e8938 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java @@ -0,0 +1,675 @@ +/* + * Created on Dec 16, 2004 + * + */ +package org.eclipse.ecf.provider.generic; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.NotSerializableException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Iterator; +import java.util.Map; +import java.util.Vector; + +import org.eclipse.ecf.core.IOSGIService; +import org.eclipse.ecf.core.ISharedObject; +import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.ISharedObjectContainer; +import org.eclipse.ecf.core.ISharedObjectContainerConfig; +import org.eclipse.ecf.core.ISharedObjectContainerListener; +import org.eclipse.ecf.core.ISharedObjectContainerTransaction; +import org.eclipse.ecf.core.ISharedObjectManager; +import org.eclipse.ecf.core.SharedObjectAddException; +import org.eclipse.ecf.core.SharedObjectContainerJoinException; +import org.eclipse.ecf.core.SharedObjectDescription; +import org.eclipse.ecf.core.SharedObjectInitException; +import org.eclipse.ecf.core.comm.AsynchConnectionEvent; +import org.eclipse.ecf.core.comm.ConnectionEvent; +import org.eclipse.ecf.core.comm.DisconnectConnectionEvent; +import org.eclipse.ecf.core.comm.IConnection; +import org.eclipse.ecf.core.comm.ISynchAsynchConnectionEventHandler; +import org.eclipse.ecf.core.comm.SynchConnectionEvent; +import org.eclipse.ecf.core.events.IContainerEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.provider.generic.gmm.Member; + +public abstract class SOContainer implements ISharedObjectContainer { + + public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class + .getName() + + ".sharedobjectargs"; + public static final String DEFAULT_OBJECT_ARGTYPES_KEY = SOContainer.class + .getName() + + ".sharedobjectargs"; + + private long sequenceNumber = 0L; + private Vector listeners = null; + + protected ISharedObjectContainerConfig config = null; + protected SOContainerGMM groupManager = null; + protected ThreadGroup loadingThreadGroup = null; + protected ThreadGroup sharedObjectThreadGroup = null; + protected SOManager sharedObjectManager = null; + protected boolean isClosing = false; + protected MessageReceiver receiver; + + protected byte[] getBytesForObject(Serializable obj) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + return bos.toByteArray(); + } + + protected void killConnection(IConnection conn) { + try { + if (conn != null) + conn.disconnect(); + } catch (IOException e) { + // XXX report + } + } + protected void memberLeave(ID target, IConnection conn) { + if (conn != null) + killConnection(conn); + } + protected void fireContainerEvent(IContainerEvent event) { + synchronized (listeners) { + for (Iterator i = listeners.iterator(); i.hasNext();) + ((ISharedObjectContainerListener) i.next()).handleEvent(event); + } + } + protected long getNextSequenceNumber() { + if (sequenceNumber == Long.MAX_VALUE) { + sequenceNumber = 0; + return sequenceNumber; + } else + return sequenceNumber++; + } + protected Object getGroupMembershipLock() { + return groupManager; + } + public ID[] getOtherMemberIDs() { + return groupManager.getOtherMemberIDs(); + } + protected ISharedObject getSharedObject(ID id) { + SOWrapper wrap = getSharedObjectWrapper(id); + if (wrap == null) + return null; + else + return wrap.getSharedObject(); + } + protected ISharedObject addSharedObjectAndWait(SharedObjectDescription sd, + ISharedObject s, ISharedObjectContainerTransaction t) + throws Exception { + if (sd.getID() == null || s == null) + return null; + ISharedObject so = addSharedObject0(sd, s); + // Wait right here until committed + if (t != null) + t.waitToCommit(); + return s; + } + protected ISharedObject addSharedObject0(SharedObjectDescription sd, + ISharedObject s) throws Exception { + addSharedObjectWrapper(makeNewSharedObjectWrapper(sd, s)); + return s; + } + protected SOWrapper makeNewSharedObjectWrapper(SharedObjectDescription sd, + ISharedObject s) { + SOConfig newConfig = makeNewSharedObjectConfig(sd, this); + return new SOWrapper(newConfig, s, this); + } + protected SOConfig makeNewSharedObjectConfig(SharedObjectDescription sd, + SOContainer cont) { + ID homeID = sd.getHomeID(); + if (homeID == null) + homeID = getID(); + return new SOConfig(sd.getID(), homeID, this, sd.getProperties()); + } + protected SOWrapper getSharedObjectWrapper(ID id) { + return groupManager.getFromActive(id); + } + protected void addSharedObjectWrapper(SOWrapper wrapper) throws Exception { + if (wrapper == null) + return; + ID id = wrapper.getObjID(); + synchronized (getGroupMembershipLock()) { + Object obj = groupManager.getFromAny(id); + if (obj != null) { + throw new SharedObjectAddException("SharedObject with id " + + id.getName() + " already in container"); + } + // Call initialize. If this throws it halts everything + wrapper.init(); + // Put in table + groupManager.addSharedObjectToActive(wrapper); + } + } + protected ISharedObject removeSharedObject(ID id) { + synchronized (getGroupMembershipLock()) { + SOWrapper wrap = groupManager.getFromActive(id); + if (wrap == null) + return null; + groupManager.removeSharedObject(id); + return wrap.getSharedObject(); + } + } + protected boolean addNewRemoteMember(ID memberID, Object data) { + return groupManager.addMember(new Member(memberID, data)); + } + abstract protected void queueContainerMessage(ContainerMessage mess) + throws IOException; + abstract protected void forwardToRemote(ID from, ID to, + ContainerMessage data) throws IOException; + abstract protected void forwardExcluding(ID from, ID excluding, + ContainerMessage data) throws IOException; + protected final void forward(ID fromID, ID toID, ContainerMessage data) + throws IOException { + if (toID == null) { + forwardExcluding(fromID, fromID, data); + } else { + forwardToRemote(fromID, toID, data); + } + } + protected boolean removeRemoteMember(ID remoteMember) { + return groupManager.removeMember(remoteMember); + } + protected void sendMessage(ContainerMessage data) throws IOException { + synchronized (getGroupMembershipLock()) { + ID ourID = getID(); + // We don't send to ourselves + if (!ourID.equals(data.getToContainerID())) + queueContainerMessage(data); + } + } + protected ID[] sendCreateSharedObjectMessage(ID toContainerID, + SharedObjectDescription sd) throws IOException { + ID[] returnIDs = null; + if (toContainerID == null) { + synchronized (getGroupMembershipLock()) { + // Send message to all + sendMessage(ContainerMessage.makeSharedObjectCreateMessage( + getID(), toContainerID, getNextSequenceNumber(), sd)); + returnIDs = getOtherMemberIDs(); + } + } else { + // If the create msg is directed to this space, no msg will be sent + if (getID().equals(toContainerID)) { + returnIDs = new ID[0]; + } else { + sendMessage(ContainerMessage.makeSharedObjectCreateMessage( + getID(), toContainerID, getNextSequenceNumber(), sd)); + returnIDs = new ID[1]; + returnIDs[0] = toContainerID; + } + } + return returnIDs; + } + protected void sendCreateResponseSharedObjectMessage(ID toContainerID, + ID fromSharedObject, Throwable t, long ident) throws IOException { + sendMessage(ContainerMessage.makeSharedObjectCreateResponseMessage( + getID(), toContainerID, getNextSequenceNumber(), + fromSharedObject, t, ident)); + } + protected void sendSharedObjectMessage(ID toContainerID, + ID fromSharedObject, Serializable data) throws IOException { + sendMessage(ContainerMessage.makeSharedObjectMessage(getID(), + toContainerID, getNextSequenceNumber(), fromSharedObject, data)); + } + protected void sendDisposeSharedObjectMessage(ID toContainerID, + ID fromSharedObject) throws IOException { + sendMessage(ContainerMessage.makeSharedObjectDisposeMessage(getID(), + toContainerID, getNextSequenceNumber(), fromSharedObject)); + } + public SOContainer(ISharedObjectContainerConfig config) { + if (config == null) + throw new InstantiationError("config must not be null"); + this.config = config; + groupManager = new SOContainerGMM(this, new Member(config.getID())); + sharedObjectManager = new SOManager(this); + loadingThreadGroup = getLoadingThreadGroup(); + sharedObjectThreadGroup = getSharedObjectThreadGroup(); + listeners = new Vector(); + receiver = new MessageReceiver(); + } + protected ISynchAsynchConnectionEventHandler getReceiver() { + return receiver; + } + protected boolean isClosing() { + return isClosing; + } + protected void setIsClosing() { + isClosing = true; + } + protected ThreadGroup getLoadingThreadGroup() { + return new ThreadGroup(getID() + ":Loading"); + } + protected ThreadGroup getSharedObjectThreadGroup() { + return new ThreadGroup(getID() + ":SOs"); + } + + public ID getID() { + return config.getID(); + } + + protected int getMaxGroupMembers() { + return groupManager.getMaxMembers(); + } + protected void setMaxGroupMembers(int max) { + groupManager.setMaxMembers(max); + } + + protected void notifySharedObjectActivated(ID sharedObjectID) { + groupManager.notifyOthersActivated(sharedObjectID); + } + protected void notifySharedObjectDeactivated(ID sharedObjectID) { + groupManager.notifyOthersDeactivated(sharedObjectID); + } + + protected boolean destroySharedObject(ID sharedObjectID) { + return groupManager.removeSharedObject(sharedObjectID); + } + + protected IOSGIService getOSGIServiceInterface() { + return null; + } + protected void sendCreate(ID sharedObjectID, ID toContainerID, + SharedObjectDescription sd) throws IOException { + sendCreateSharedObjectMessage(toContainerID, sd); + } + protected void sendDispose(ID toContainerID, ID sharedObjectID) + throws IOException { + sendDisposeSharedObjectMessage(toContainerID, sharedObjectID); + } + protected void sendMessage(ID toContainerID, ID sharedObjectID, + Object message) throws IOException { + if (message == null) + return; + if (message instanceof Serializable) + throw new NotSerializableException(message.getClass().getName()); + sendSharedObjectMessage(toContainerID, sharedObjectID, + (Serializable) message); + } + protected void sendCreateResponse(ID homeID, ID sharedObjectID, + Throwable t, long identifier) throws IOException { + sendCreateResponseSharedObjectMessage(homeID, sharedObjectID, t, + identifier); + } + protected Thread getNewSharedObjectThread(ID sharedObjectID, + Runnable runnable) { + return new Thread(sharedObjectThreadGroup, runnable, getID().getName() + + ";" + sharedObjectID.getName()); + } + protected ISharedObject load(SharedObjectDescription sd) throws Exception { + return sharedObjectManager.loadSharedObject(sd); + } + protected ID[] getSharedObjectIDs() { + return groupManager.getSharedObjectIDs(); + } + protected SOConfig makeSharedObjectConfig(SharedObjectDescription sd, + ISharedObject obj) { + return new SOConfig(sd.getID(), sd.getHomeID(), this, sd + .getProperties()); + } + protected void moveFromLoadingToActive(SOWrapper wrap) { + groupManager.moveSharedObjectFromLoadingToActive(wrap); + } + protected void removeFromLoading(ID id) { + groupManager.removeSharedObjectFromLoading(id); + } + + protected void processDisconnect(DisconnectConnectionEvent e) { + // XXX TODO + } + protected void processAsynch(AsynchConnectionEvent e) { + // XXX TODO + } + protected Serializable processSynch(SynchConnectionEvent e) + throws IOException { + // XXX TODO + return null; + } + class LoadingSharedObject implements ISharedObject { + + SharedObjectDescription description; + Thread runner = null; + + LoadingSharedObject(SharedObjectDescription sd) { + this.description = sd; + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig) + */ + public void init(ISharedObjectConfig initData) + throws SharedObjectInitException { + } + ID getID() { + return description.getID(); + } + + ID getHomeID() { + return description.getHomeID(); + } + + void start() { + if (runner == null) { + runner = (Thread) AccessController + .doPrivileged(new PrivilegedAction() { + public Object run() { + return getThread(); + } + }); + runner.setDaemon(true); + runner.start(); + } + + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event) + */ + public void handleEvent(Event event) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[]) + */ + public void handleEvents(Event[] events) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID) + */ + public void dispose(ID containerID) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } + Thread getThread() { + return new Thread(loadingThreadGroup, new Runnable() { + public void run() { + try { + if (Thread.currentThread().isInterrupted() + || isClosing()) + throw new InterruptedException( + "Loading interrupted for object " + + getID().getName()); + // First load given object + ISharedObject obj = load(description); + // Get config info for new object + SOConfig aConfig = makeSharedObjectConfig(description, + obj); + // Call init method on new object. + obj.init(aConfig); + // Check to make sure thread has not been + // interrupted...if it has, throw + if (Thread.currentThread().isInterrupted() + || isClosing()) + throw new InterruptedException( + "Loading interrupted for object " + + getID().getName()); + + // Create meta object and move from loading to active + // list. + SOContainer.this.moveFromLoadingToActive(new SOWrapper( + aConfig, obj, SOContainer.this)); + } catch (Exception e) { + SOContainer.this.removeFromLoading(getID()); + try { + sendCreateResponse(getHomeID(), getID(), e, + description.getIdentifier()); + } catch (Exception e1) { + } + } + } + }, "LRunner" + getID().getName()); + } + + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getConfig() + */ + public ISharedObjectContainerConfig getConfig() { + return config; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#addListener(org.eclipse.ecf.core.ISharedObjectContainerListener, + * java.lang.String) + */ + public void addListener(ISharedObjectContainerListener l, String filter) { + synchronized (listeners) { + listeners.add(l); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#removeListener(org.eclipse.ecf.core.ISharedObjectContainerListener) + */ + public void removeListener(ISharedObjectContainerListener l) { + synchronized (listeners) { + listeners.remove(l); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#dispose(long) + */ + public void dispose(long waittime) { + isClosing = true; + // XXX Notify listeners that we're going away + // Clear group manager + groupManager.removeAllMembers(); + // Clear shared object manager + sharedObjectManager.dispose(); + try { + synchronized (this) { + wait(waittime); + } + } catch (InterruptedException e) { + + } + sharedObjectThreadGroup.interrupt(); + loadingThreadGroup.interrupt(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#joinGroup(org.eclipse.ecf.core.identity.ID, + * java.lang.Object) + */ + public abstract void joinGroup(ID groupID, Object loginData) + throws SharedObjectContainerJoinException; + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#leaveGroup() + */ + public abstract void leaveGroup(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupID() + */ + public abstract ID getGroupID(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupMemberIDs() + */ + public ID[] getGroupMemberIDs() { + return groupManager.getMemberIDs(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupManager() + */ + public abstract boolean isGroupManager(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupServer() + */ + public abstract boolean isGroupServer(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getSharedObjectManager() + */ + public ISharedObjectManager getSharedObjectManager() { + return sharedObjectManager; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class adapter) { + return null; + } + + protected ClassLoader getClassLoaderForContainer() { + return this.getClass().getClassLoader(); + } + /** + * @param sd + * @return + */ + protected ClassLoader getClassLoaderForSharedObject( + SharedObjectDescription sd) { + if (sd != null) { + ClassLoader cl = sd.getClassLoader(); + if (cl != null) + return cl; + else + return getClassLoaderForContainer(); + } else + return getClassLoaderForContainer(); + } + /** + * @param sd + * @return + */ + public Object[] getArgsFromProperties(SharedObjectDescription sd) { + if (sd == null) + return null; + Map aMap = sd.getProperties(); + if (aMap == null) + return null; + Object obj = aMap.get(DEFAULT_OBJECT_ARG_KEY); + if (obj == null) + return null; + if (obj instanceof Object[]) { + Object[] ret = (Object[]) obj; + aMap.remove(DEFAULT_OBJECT_ARG_KEY); + return ret; + } else + return null; + } + /** + * @param sd + * @return + */ + public String[] getArgTypesFromProperties(SharedObjectDescription sd) { + if (sd == null) + return null; + Map aMap = sd.getProperties(); + if (aMap == null) + return null; + Object obj = aMap.get(DEFAULT_OBJECT_ARGTYPES_KEY); + if (obj == null) + return null; + if (obj instanceof String[]) { + String[] ret = (String[]) obj; + aMap.remove(DEFAULT_OBJECT_ARGTYPES_KEY); + return ret; + } else + return null; + } + + class MessageReceiver implements ISynchAsynchConnectionEventHandler { + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.ISynchConnectionEventHandler#handleSynchEvent(org.eclipse.ecf.internal.comm.SynchConnectionEvent) + */ + public Object handleSynchEvent(SynchConnectionEvent event) + throws IOException { + return processSynch(event); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleSuspectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent) + */ + public boolean handleSuspectEvent(ConnectionEvent event) { + return false; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleDisconnectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent) + */ + public void handleDisconnectEvent(DisconnectConnectionEvent event) { + processDisconnect(event); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IAsynchConnectionEventHandler#handleAsynchEvent(org.eclipse.ecf.internal.comm.AsynchConnectionEvent) + */ + public void handleAsynchEvent(AsynchConnectionEvent event) + throws IOException { + processAsynch(event); + } + + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java new file mode 100644 index 000000000..dd493c2b7 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerConfig.java @@ -0,0 +1,49 @@ +package org.eclipse.ecf.provider.generic; + +import java.util.HashMap; +import java.util.Map; + +import org.eclipse.ecf.core.ISharedObjectContainerConfig; +import org.eclipse.ecf.core.identity.ID; + +public class SOContainerConfig implements ISharedObjectContainerConfig { + + ID id; + Map properties; + + public SOContainerConfig(ID id, Map props) { + this.id = id; + this.properties = props; + } + public SOContainerConfig(ID id) { + this.id = id; + this.properties = new HashMap(); + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainerConfig#getProperties() + */ + public Map getProperties() { + return properties; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainerConfig#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.IIdentifiable#getID() + */ + public ID getID() { + return id; + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java new file mode 100644 index 000000000..6020c10fa --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGMM.java @@ -0,0 +1,323 @@ +package org.eclipse.ecf.provider.generic; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Observable; +import java.util.Observer; +import java.util.TreeMap; + +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.provider.generic.gmm.GMMImpl; +import org.eclipse.ecf.provider.generic.gmm.Member; +import org.eclipse.ecf.provider.generic.gmm.MemberChanged; + +class SOContainerGMM implements Observer { + + public static boolean DEBUG = false; + + SOContainer container; + Member localMember; + GMMImpl groupManager; + // Maximum number of members. Default is -1 (no maximum). + int maxMembers = -1; + TreeMap loading, active; + + SOContainerGMM(SOContainer space, Member local) { + container = space; + groupManager = new GMMImpl(); + groupManager.addObserver(this); + loading = new TreeMap(); + active = new TreeMap(); + localMember = local; + addMember(local); + } + + ID[] getSharedObjectIDs() { + return getActiveKeys(); + } + synchronized boolean addMember(Member m) { + if (maxMembers > 0 && getSize() > maxMembers) { + return false; + } else { + return groupManager.addMember(m); + } + } + synchronized int setMaxMembers(int max) { + int old = maxMembers; + maxMembers = max; + return old; + } + synchronized int getMaxMembers() { + return maxMembers; + } + + synchronized boolean removeMember(Member m) { + boolean res = groupManager.removeMember(m); + if (res) { + removeSharedObjects(m); + } + return res; + } + + synchronized boolean removeMember(ID id) { + Member m = getMemberForID(id); + if (m == null) + return false; + return removeMember(m); + } + + void removeAllMembers() { + removeAllMembers(null); + } + + void removeNonLocalMembers() { + removeAllMembers(localMember); + } + + synchronized void removeAllMembers(Member exception) { + Object m[] = getMembers(); + for (int i = 0; i < m.length; i++) { + Member mem = (Member) m[i]; + if (exception == null || !exception.equals(mem)) + removeMember(mem); + } + } + + synchronized Object[] getMembers() { + return groupManager.getMembers(); + } + + synchronized ID[] getOtherMemberIDs() { + return groupManager.getMemberIDs(localMember.getID()); + } + + synchronized ID[] getMemberIDs() { + return groupManager.getMemberIDs(null); + } + + synchronized Member getMemberForID(ID id) { + Member newMem = new Member(id); + for (Iterator i = iterator(); i.hasNext();) { + Member oldMem = (Member) i.next(); + if (newMem.equals(oldMem)) + return oldMem; + } + return null; + } + + synchronized int getSize() { + return groupManager.getSize(); + } + + synchronized boolean containsMember(Member m) { + return groupManager.containsMember(m); + } + + synchronized Iterator iterator() { + return groupManager.iterator(); + } + + // End group membership change methods + + synchronized boolean addRepObj(SOWrapper ro) { + if (getFromAny(ro.getObjID()) != null) + return false; + addSharedObjectToActive(ro); + return true; + } + + synchronized boolean addLoadingSharedObject( + SOContainer.LoadingSharedObject lso) { + if (getFromAny(lso.getID()) != null) + return false; + loading.put(lso.getID(), new SOWrapper(lso, container)); + // And start the thing + lso.start(); + return true; + } + + synchronized void moveSharedObjectFromLoadingToActive(SOWrapper ro) { + if (removeSharedObjectFromLoading(ro.getObjID())) + addSharedObjectToActive(ro); + } + + boolean removeSharedObjectFromLoading(ID id) { + if (loading.remove(id) != null) { + return true; + } else + return false; + } + + synchronized ID[] getActiveKeys() { + return (ID[]) active.keySet().toArray(new ID[0]); + } + + void addSharedObjectToActive(SOWrapper so) { + ID[] ids = getActiveKeys(); + active.put(so.getObjID(), so); + so.activated(ids); + } + + synchronized void notifyOthersActivated(ID id) { + notifyOtherChanged(id, active, true); + } + + synchronized void notifyOthersDeactivated(ID id) { + notifyOtherChanged(id, active, false); + } + + void notifyOtherChanged(ID id, TreeMap aMap, boolean activated) { + for (Iterator i = aMap.values().iterator(); i.hasNext();) { + SOWrapper other = (SOWrapper) i.next(); + if (!id.equals(other.getObjID())) { + other.otherChanged(id, activated); + } + } + } + + synchronized boolean removeSharedObject(ID id) { + SOWrapper ro = removeFromMap(id, active); + if (ro == null) + return false; + ro.deactivated(); + return true; + } + + synchronized SOWrapper getFromMap(ID objID, TreeMap aMap) { + return (SOWrapper) aMap.get(objID); + } + + synchronized SOWrapper removeFromMap(ID objID, TreeMap aMap) { + return (SOWrapper) aMap.remove(objID); + } + + SOWrapper getFromLoading(ID objID) { + return getFromMap(objID, loading); + } + + SOWrapper getFromActive(ID objID) { + return getFromMap(objID, active); + } + + synchronized SOWrapper getFromAny(ID objID) { + SOWrapper ro = getFromMap(objID, active); + if (ro != null) + return ro; + ro = getFromMap(objID, loading); + return ro; + } + + // Notification methods + void notifyAllOfMemberChange(Member m, TreeMap map, boolean add) { + for (Iterator i = map.values().iterator(); i.hasNext();) { + SOWrapper ro = (SOWrapper) i.next(); + ro.memberChanged(m, add); + } + } + + public void update(Observable o, Object arg) { + MemberChanged mc = (MemberChanged) arg; + notifyAllOfMemberChange(mc.getMember(), active, mc.getAdded()); + } + + synchronized void removeSharedObjects(Member m) { + removeSharedObjects(m, true); + } + + synchronized void clear() { + removeSharedObjects(null, true); + } + + void removeSharedObjects(Member m, boolean match) { + HashSet set = getRemoveIDs(m.getID(), match); + Iterator i = set.iterator(); + + while (i.hasNext()) { + ID removeID = (ID) i.next(); + if (isLoading(removeID)) { + removeSharedObjectFromLoading(removeID); + } else { + container.destroySharedObject(removeID); + } + } + } + + HashSet getRemoveIDs(ID homeID, boolean match) { + HashSet aSet = new HashSet(); + for (Iterator i = new DestroyIterator(loading, homeID, match); i + .hasNext();) { + aSet.add(i.next()); + } + for (Iterator i = new DestroyIterator(active, homeID, match); i + .hasNext();) { + aSet.add(i.next()); + } + return aSet; + } + + synchronized boolean isActive(ID id) { + return active.containsKey(id); + } + + synchronized boolean isLoading(ID id) { + return loading.containsKey(id); + } + + boolean debug() { + return DEBUG; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("RSM["); + sb.append(groupManager); + sb.append(";L:").append(loading); + sb.append(";A:").append(active).append("]"); + return sb.toString(); + } + +} + +class DestroyIterator implements Iterator { + ID next; + ID homeID; + Iterator i; + boolean match; + + public DestroyIterator(TreeMap map, ID hID, boolean m) { + i = map.values().iterator(); + homeID = hID; + next = null; + match = m; + } + + public boolean hasNext() { + if (next == null) + next = getNext(); + return (next != null); + } + + public Object next() { + if (hasNext()) { + ID value = next; + next = null; + return value; + } else { + throw new java.util.NoSuchElementException(); + } + } + + ID getNext() { + while (i.hasNext()) { + SOWrapper ro = (SOWrapper) i.next(); + if (homeID == null || (match ^ !ro.getHomeID().equals(homeID))) { + return ro.getObjID(); + } + } + return null; + } + + public void remove() { + throw new UnsupportedOperationException(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java new file mode 100644 index 000000000..01eb178be --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainerGroup.java @@ -0,0 +1,45 @@ +package org.eclipse.ecf.provider.generic; + +import java.util.TreeMap; +import java.util.Collections; +import java.util.Map; +import java.util.Iterator; + +public class SOContainerGroup { + + String name; + protected Map map; + + public SOContainerGroup(String name) { + this.name = name; + map = Collections.synchronizedMap(new TreeMap()); + } + public String add(String key, SOContainer aSpace) { + if (key == null || aSpace == null) + return null; + map.put(key, aSpace); + return key; + } + + public SOContainer get(String key) { + if (key == null) + return null; + return (SOContainer) map.get(key); + } + public SOContainer remove(String key) { + if (key == null) + return null; + return (SOContainer) map.remove(key); + } + public boolean contains(String key) { + if (key == null) + return false; + return map.containsKey(key); + } + public String getName() { + return name; + } + public Iterator elements() { + return map.values().iterator(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java new file mode 100644 index 000000000..8501bd105 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContext.java @@ -0,0 +1,218 @@ +/* + * Created on Dec 6, 2004 + * + */ +package org.eclipse.ecf.provider.generic; + +import java.io.IOException; +import java.util.Map; + +import org.eclipse.ecf.core.IOSGIService; +import org.eclipse.ecf.core.ISharedObjectContext; +import org.eclipse.ecf.core.ISharedObjectManager; +import org.eclipse.ecf.core.SharedObjectContainerJoinException; +import org.eclipse.ecf.core.SharedObjectDescription; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.QueueEnqueue; + +public class SOContext implements ISharedObjectContext { + + SOContainer container = null; + ID sharedObjectID; + ID homeContainerID; + boolean isActive; + Map properties; + QueueEnqueue queue; + + public SOContext(ID objID, ID homeID, SOContainer cont, Map props, + QueueEnqueue queue) { + super(); + this.sharedObjectID = objID; + this.homeContainerID = homeID; + this.container = cont; + this.properties = props; + this.queue = queue; + } + protected synchronized void makeInactive() { + container = null; + properties = null; + queue = null; + } + protected synchronized boolean isInactive() { + return (container == null); + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getContainerID() + */ + public synchronized ID getLocalContainerID() { + if (isInactive()) { + return null; + } + return container.getConfig().getID(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getSharedObjectManager() + */ + public synchronized ISharedObjectManager getSharedObjectManager() { + if (isInactive()) { + return null; + } + return container.getSharedObjectManager(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getQueue() + */ + public synchronized QueueEnqueue getQueue() { + if (isInactive()) { + return null; + } + return queue; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#joinGroup(org.eclipse.ecf.core.identity.ID, + * java.lang.Object) + */ + public synchronized void joinGroup(ID groupID, Object loginData) + throws SharedObjectContainerJoinException { + if (isInactive()) { + return; + } else + container.joinGroup(groupID, loginData); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#leaveGroup() + */ + public synchronized void leaveGroup() { + if (isInactive()) { + return; + } else + container.leaveGroup(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getGroupID() + */ + public synchronized ID getGroupID() { + if (isInactive()) { + return null; + } else + return container.getGroupID(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#isGroupManager() + */ + public synchronized boolean isGroupManager() { + if (isInactive()) { + return false; + } else + return container.isGroupManager(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#isGroupServer() + */ + public synchronized boolean isGroupServer() { + if (isInactive()) { + return false; + } else + return container.isGroupManager(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getGroupMembership() + */ + public synchronized ID[] getGroupMemberIDs() { + if (isInactive()) { + return null; + } else + return container.getGroupMemberIDs(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#sendCreate(org.eclipse.ecf.core.identity.ID, + * org.eclipse.ecf.core.SharedObjectDescription) + */ + public synchronized void sendCreate(ID toContainerID, + SharedObjectDescription sd) throws IOException { + if (isInactive()) { + return; + } else { + container.sendCreate(sharedObjectID, toContainerID, sd); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#sendDispose(org.eclipse.ecf.core.identity.ID) + */ + public synchronized void sendDispose(ID toContainerID) throws IOException { + if (isInactive()) { + return; + } else { + container.sendDispose(toContainerID, sharedObjectID); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#sendMessage(org.eclipse.ecf.core.identity.ID, + * java.lang.Object) + */ + public void sendMessage(ID toContainerID, Object data) throws IOException { + if (isInactive()) { + return; + } else { + container.sendMessage(toContainerID, sharedObjectID, data); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContext#getServiceAccess() + */ + public IOSGIService getServiceAccess() { + if (isInactive()) { + return null; + } else { + return container.getOSGIServiceInterface(); + } + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java new file mode 100644 index 000000000..c8b6b1331 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java @@ -0,0 +1,264 @@ +/* + * Created on Dec 20, 2004 + * + */ +package org.eclipse.ecf.provider.generic; + +import java.lang.reflect.Constructor; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.eclipse.ecf.core.ISharedObject; +import org.eclipse.ecf.core.ISharedObjectConnector; +import org.eclipse.ecf.core.ISharedObjectContainerTransaction; +import org.eclipse.ecf.core.ISharedObjectManager; +import org.eclipse.ecf.core.SharedObjectAddException; +import org.eclipse.ecf.core.SharedObjectConnectException; +import org.eclipse.ecf.core.SharedObjectCreateException; +import org.eclipse.ecf.core.SharedObjectDescription; +import org.eclipse.ecf.core.SharedObjectDisconnectException; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.AbstractFactory; +import org.eclipse.ecf.core.util.QueueEnqueue; + +/** + * + */ +public class SOManager implements ISharedObjectManager { + + SOContainer container = null; + Vector connectors = null; + + public SOManager(SOContainer cont) { + super(); + this.container = cont; + connectors = new Vector(); + } + + protected void addConnector(ISharedObjectConnector conn) { + connectors.add(conn); + } + protected boolean removeConnector(ISharedObjectConnector conn) { + return connectors.remove(conn); + } + protected List getConnectors() { + return connectors; + } + protected Class[] getArgTypes(String[] argTypes, Object[] args, + ClassLoader cl) throws ClassNotFoundException { + return AbstractFactory.getClassesForTypes(argTypes, args, cl); + } + + protected ISharedObject makeSharedObjectInstance(final Class newClass, + final Class[] argTypes, final Object[] args) throws Exception { + Object newObject = null; + try { + newObject = AccessController + .doPrivileged(new PrivilegedExceptionAction() { + public Object run() throws Exception { + Constructor aConstructor = newClass + .getConstructor(argTypes); + aConstructor.setAccessible(true); + return aConstructor.newInstance(args); + } + }); + } catch (java.security.PrivilegedActionException e) { + throw e.getException(); + } + return verifySharedObject(newObject); + } + protected ISharedObject verifySharedObject(Object newSharedObject) { + if (newSharedObject instanceof ISharedObject) + return (ISharedObject) newSharedObject; + else + throw new ClassCastException("shared object " + + newSharedObject.toString() + " does not implement " + + ISharedObject.class.getName()); + } + protected ISharedObject loadSharedObject(SharedObjectDescription sd) + throws Exception { + // First get classloader + ClassLoader cl = container.getClassLoaderForSharedObject(sd); + // Then get args array from properties + Object[] args = container.getArgsFromProperties(sd); + // And arg types + String[] types = container.getArgTypesFromProperties(sd); + Class[] argTypes = getArgTypes(types, args, cl); + // Now load top-level class + final Class newClass = Class.forName(sd.getClassname(), true, cl); + return makeSharedObjectInstance(newClass, argTypes, args); + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#getSharedObjectIDs() + */ + public ID[] getSharedObjectIDs() { + return container.getSharedObjectIDs(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#createSharedObject(org.eclipse.ecf.core.SharedObjectDescription, + * org.eclipse.ecf.core.ISharedObjectContainerTransaction) + */ + public ID createSharedObject(SharedObjectDescription sd, + ISharedObjectContainerTransaction trans) + throws SharedObjectCreateException { + ISharedObject newObject = null; + Throwable t = null; + ID result = sd.getID(); + try { + newObject = loadSharedObject(sd); + return addSharedObject(result, newObject, sd.getProperties(), trans); + } catch (Exception e) { + throw new SharedObjectCreateException(t); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#addSharedObject(org.eclipse.ecf.core.identity.ID, + * org.eclipse.ecf.core.ISharedObject, java.util.Map, + * org.eclipse.ecf.core.ISharedObjectContainerTransaction) + */ + public ID addSharedObject(ID sharedObjectID, ISharedObject sharedObject, + Map properties, ISharedObjectContainerTransaction trans) + throws SharedObjectAddException { + Throwable t = null; + ID result = sharedObjectID; + try { + ISharedObject so = sharedObject; + SharedObjectDescription sd = new SharedObjectDescription( + sharedObject.getClass().getClassLoader(), sharedObjectID, + container.getID(), sharedObject.getClass().getName(), + properties, 0); + container.addSharedObjectAndWait(sd, so, trans); + } catch (Exception except) { + throw new SharedObjectAddException(except); + } + return result; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#getSharedObject(org.eclipse.ecf.core.identity.ID) + */ + public ISharedObject getSharedObject(ID sharedObjectID) { + return container.getSharedObject(sharedObjectID); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#removeSharedObject(org.eclipse.ecf.core.identity.ID) + */ + public ISharedObject removeSharedObject(ID sharedObjectID) { + return container.removeSharedObject(sharedObjectID); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#connectSharedObjects(org.eclipse.ecf.core.identity.ID, + * org.eclipse.ecf.core.identity.ID[]) + */ + public ISharedObjectConnector connectSharedObjects(ID sharedObjectFrom, + ID[] sharedObjectsTo) throws SharedObjectConnectException { + if (sharedObjectFrom == null) + throw new SharedObjectConnectException("sender cannot be null"); + if (sharedObjectsTo == null) + throw new SharedObjectConnectException("receivers cannot be null"); + ISharedObjectConnector result = null; + synchronized (container.getGroupMembershipLock()) { + // Get from to make sure it's there + SOWrapper wrap = container.getSharedObjectWrapper(sharedObjectFrom); + if (wrap == null) + throw new SharedObjectConnectException("sender object " + + sharedObjectFrom.getName() + " not found"); + QueueEnqueue[] queues = new QueueEnqueue[sharedObjectsTo.length]; + for (int i = 0; i < sharedObjectsTo.length; i++) { + SOWrapper w = container + .getSharedObjectWrapper(sharedObjectsTo[i]); + if (w == null) + throw new SharedObjectConnectException("receiver object " + + sharedObjectsTo[i].getName() + " not found"); + queues[i] = new QueueEnqueueImpl(w.getQueue()); + } + // OK now we've got ids and wrappers, make a connector + result = new SOConnector(sharedObjectFrom, sharedObjectsTo, queues); + } + return result; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#disconnectSharedObjects(org.eclipse.ecf.core.ISharedObjectConnector) + */ + public void disconnectSharedObjects(ISharedObjectConnector connector) + throws SharedObjectDisconnectException { + if (connector == null) + throw new SharedObjectDisconnectException("connect cannot be null"); + if (!removeConnector(connector)) { + throw new SharedObjectDisconnectException("connector " + connector + + " not found"); + } + connector.dispose(); + } + protected void dispose() { + for (Enumeration e = connectors.elements(); e.hasMoreElements();) { + ISharedObjectConnector conn = (ISharedObjectConnector) e + .nextElement(); + conn.dispose(); + } + connectors.clear(); + } + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectManager#getSharedObjectConnectors(org.eclipse.ecf.core.identity.ID) + */ + public List getSharedObjectConnectors(ID sharedObjectFrom) { + List results = new ArrayList(); + for (Enumeration e = connectors.elements(); e.hasMoreElements();) { + ISharedObjectConnector conn = (ISharedObjectConnector) e + .nextElement(); + if (sharedObjectFrom.equals(conn.getSender())) { + results.add(conn); + } + } + return results; + } + + public static Class[] getClassesForTypes(String[] argTypes, Object[] args, + ClassLoader cl) throws ClassNotFoundException { + Class clazzes[] = null; + if (args == null || args.length == 0) + clazzes = new Class[0]; + else if (argTypes != null) { + clazzes = new Class[argTypes.length]; + for (int i = 0; i < argTypes.length; i++) { + clazzes[i] = Class.forName(argTypes[i], true, cl); + } + } else { + clazzes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + if (args[i] == null) + clazzes[i] = null; + else + clazzes[i] = args[i].getClass(); + } + } + return clazzes; + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java new file mode 100644 index 000000000..93b4a17ce --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java @@ -0,0 +1,294 @@ +package org.eclipse.ecf.provider.generic; + +import java.io.Serializable; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import org.eclipse.ecf.core.ISharedObject; +import org.eclipse.ecf.core.SharedObjectInitException; +import org.eclipse.ecf.core.events.SharedObjectActivatedEvent; +import org.eclipse.ecf.core.events.SharedObjectContainerDepartedEvent; +import org.eclipse.ecf.core.events.SharedObjectContainerJoinedEvent; +import org.eclipse.ecf.core.events.SharedObjectDeactivatedEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.AsynchResult; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.core.util.SimpleQueueImpl; +import org.eclipse.ecf.provider.Debug; +import org.eclipse.ecf.provider.generic.gmm.Member; + +final class SOWrapper { + static Debug debug = Debug.create(SOWrapper.class.getName()); + + protected ISharedObject sharedObject; + private SOConfig sharedObjectConfig; + private ID sharedObjectID; + private ID sharedObjectHomeID; + private SOContainer container; + private ID containerID; + private Thread thread; + private SimpleQueueImpl queue; + + SOWrapper(SOContainer.LoadingSharedObject obj, SOContainer cont) { + sharedObjectID = obj.getID(); + sharedObjectHomeID = obj.getHomeID(); + sharedObject = obj; + container = cont; + containerID = cont.getID(); + sharedObjectConfig = null; + thread = null; + queue = new SimpleQueueImpl(); + } + SOWrapper(SOConfig aConfig, ISharedObject obj, SOContainer cont) { + sharedObjectConfig = aConfig; + sharedObjectID = sharedObjectConfig.getSharedObjectID(); + sharedObjectHomeID = sharedObjectConfig.getHomeContainerID(); + sharedObject = obj; + container = cont; + containerID = cont.getID(); + thread = null; + queue = new SimpleQueueImpl(); + } + + void init() throws SharedObjectInitException { + sharedObject.init(sharedObjectConfig); + } + ID getObjID() { + return sharedObjectConfig.getSharedObjectID(); + } + + ID getHomeID() { + return sharedObjectConfig.getHomeContainerID(); + } + + void activated(ID[] ids) { + // First, make space reference accessible to use by RepObject + sharedObjectConfig.makeActive(new QueueEnqueueImpl(queue)); + thread = (Thread) AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + // Get thread instance + Thread aThread = getThread(); + return aThread; + } + }); + thread.start(); + send(new SharedObjectActivatedEvent(containerID, sharedObjectID, ids)); + container.notifySharedObjectActivated(sharedObjectID); + + } + void deactivated() { + send(new SharedObjectDeactivatedEvent(containerID, sharedObjectID)); + container.notifySharedObjectDeactivated(sharedObjectID); + destroyed(); + } + private void destroyed() { + if (!queue.isStopped()) { + sharedObjectConfig.makeInactive(); + // Enqueue destroy message on our RepObject's queue + if (thread != null) + queue.enqueue(new DisposeEvent()); + // Close queue...RepObject will receive no more messages from this + // point on. + queue.close(); + } + + } + void otherChanged(ID otherID, boolean activated) { + if (activated && thread != null) { + send(new SharedObjectActivatedEvent(containerID, otherID, null)); + } else { + send(new SharedObjectDeactivatedEvent(containerID, otherID)); + } + } + void memberChanged(Member m, boolean add) { + if (thread != null) { + if (add) { + send(new SharedObjectContainerJoinedEvent(containerID, m + .getID())); + } else { + send(new SharedObjectContainerDepartedEvent(containerID, m + .getID())); + } + } + } + Thread getThread() { + // Get new thread instance from space. + return container.getNewSharedObjectThread(sharedObjectID, + new Runnable() { + public void run() { + if (Debug.ON && debug != null) { + debug.msg("Starting runner for " + sharedObjectID); + } + // The debug class will associate this thread with + // container + Debug.setThreadDebugGroup(container.getID()); + // Then process messages on queue until interrupted or + // queue closed + //Msg aMsg = null; + Event evt = null; + for (;;) { + // make sure the thread hasn't been interrupted and + // get Msg from SimpleQueueImpl + if (Thread.currentThread().isInterrupted()) + break; + + evt = (Event) queue.dequeue(); + if (Thread.currentThread().isInterrupted() + || evt == null) + break; + + try { + if (evt instanceof ProcEvent) { + SOWrapper.this.svc(((ProcEvent) evt) + .getEvent()); + } else if (evt instanceof DisposeEvent) { + SOWrapper.this.doDestroy(); + } + } catch (Throwable t) { + if (Debug.ON && debug != null) { + debug.dumpStack(t, + "Exception executing event " + evt + + " on meta " + this); + } + handleRuntimeException(t); + } + } + // If the thread was interrupted, then show appropriate + // spam + if (Thread.currentThread().isInterrupted()) { + if (Debug.ON && debug != null) { + debug + .msg("Runner for " + + sharedObjectID + + " terminating after being interrupted"); + } + } else { + if (Debug.ON && debug != null) { + debug.msg("Runner for " + sharedObjectID + + " terminating normally"); + } + } + } + }); + } + private void send(Event evt) { + queue.enqueue(new ProcEvent(evt)); + } + + protected static class ProcEvent implements Event { + Event theEvent = null; + ProcEvent(Event event) { + theEvent = event; + } + Event getEvent() { + return theEvent; + } + } + protected static class DisposeEvent implements Event { + DisposeEvent() { + } + } + void svc(Event evt) { + sharedObject.handleEvent(evt); + } + void doDestroy() { + sharedObject.dispose(containerID); + } + + //void createMsgResp(ID fromID, ContainerMessage.CreateResponse resp) { + /* + * if (sharedObjectConfig.getMsgMask().get(MsgMask.CREATERESPONSE) && thread != + * null) { send( Msg.makeMsg( null, CREATE_RESP_RCVD, fromID, resp.myExcept, + * new Long(resp.mySeq))); } + */ + //} + void deliverObjectFromRemote(ID fromID, Serializable data) { + // If we have a container, forward message onto container + /* + * if (myContainerID != null) { forwardToContainer( Msg.makeMsg(null, + * REMOTE_REPOBJ_MSG, fromID, data)); // otherwise, send to our object + * (assuming it has thread and that it wants to receive message) } else + * if ( sharedObjectConfig.getMsgMask().get(MsgMask.REMOTEDATA) && + * thread != null) { send(Msg.makeMsg(null, REMOTE_REPOBJ_MSG, fromID, + * data)); } + */ + } + + void forwardToContainer(Event msg) { + /* + * try { container.deliverForwardToRepObject(sharedObjectID, + * myContainerID, msg); } catch (Exception e) { + * handleRuntimeException(e); } + */ + } + void deliverEventFromSharedObject(ID fromID, Event evt) { + /* + * if (myContainerID != null) { forwardToContainer(Msg.makeMsg(null, + * REPOBJ_MSG, fromID, msg)); // otherwise, send to our object (assuming + * it has thread and that it wants to receive message) } else if ( + * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread != + * null) { send(Msg.makeMsg(null, REPOBJ_MSG, fromID, msg)); } + */ + } + void deliverRequestFromRepObject(ID fromID, Event evt, AsynchResult future) { + /* + * if (myContainerID != null) { forwardToContainer( Msg.makeMsg(null, + * REPOBJ_REQ, fromID, msg, future)); } else if ( + * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread != + * null) { // Check to see that messages may be received...determined by + * the REPOBJMSG // bit in msg mask send(Msg.makeMsg(null, REPOBJ_REQ, + * fromID, msg, future)); } + */ + } + void deliverForwardedMsg(ID fromID, Event evt) { + /* + * if (myContainerID != null) { forwardToContainer(Msg.makeMsg(null, + * REPOBJ_FOR, fromID, msg)); } else if ( + * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread != + * null) { send(Msg.makeMsg(null, REPOBJ_FOR, fromID, msg)); } + */ + } + void deliverRemoteMessageFailed(ID toID, Serializable object, Throwable e) { + /* + * if (sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread != + * null) { send(Msg.makeMsg(null, REMOTE_REPOBJ_MSG_FAILED, toID, + * object, e)); } + */ + } + + void destroySelf() { + /* + * if (thread != null) { send(Msg.makeMsg(null, REPOBJ_DESTROY_SELF)); } + */ + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("SharedObjectWrapper[").append(getObjID()).append("]"); + return sb.toString(); + } + void handleRuntimeException(Throwable except) { + if (Debug.ON && debug != null) { + debug.dumpStack(except, "handleRuntimeException called for " + + sharedObjectID); + } + try { + Debug.errDumpStack(except, "handleRuntimeException called for " + + sharedObjectID); + } catch (Throwable e) { + } + } + /** + * @return + */ + protected ISharedObject getSharedObject() { + return sharedObject; + } + /** + * @return + */ + public SimpleQueueImpl getQueue() { + return queue; + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java new file mode 100644 index 000000000..0c3e9c5d0 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java @@ -0,0 +1,230 @@ +package org.eclipse.ecf.provider.generic; + +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.Serializable; +import java.net.ConnectException; +import java.net.Socket; + +import org.eclipse.ecf.core.ISharedObjectContainerConfig; +import org.eclipse.ecf.core.SharedObjectContainerJoinException; +import org.eclipse.ecf.core.comm.IAsynchConnection; +import org.eclipse.ecf.core.comm.ISynchAsynchConnection; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.provider.generic.gmm.Member; + +public class ServerSOContainer extends SOContainer { + + public ServerSOContainer(ISharedObjectContainerConfig config) { + super(config); + } + public boolean isGroupServer() { + return true; + } + public boolean isGroupManager() { + return true; + } + public ID getGroupID() { + return getID(); + } + protected void queueContainerMessage(ContainerMessage message) + throws IOException { + if (message.getToContainerID() == null) { + queueToAll(message); + } else { + IAsynchConnection conn = getConnectionForID(message + .getToContainerID()); + if (conn != null) + conn.sendAsynch(message.getToContainerID(), + getBytesForObject(message)); + } + } + protected void forwardToRemote(ID from, ID to, ContainerMessage data) + throws IOException { + queueContainerMessage(new ContainerMessage(from, to, + getNextSequenceNumber(), data.getData())); + } + protected void forwardExcluding(ID from, ID excluding, ContainerMessage data) + throws IOException { + if (excluding == null) { + queueContainerMessage(new ContainerMessage(from, null, + getNextSequenceNumber(), data.getData())); + } else { + Object ms[] = groupManager.getMembers(); + for (int i = 0; i < ms.length; i++) { + Member m = (Member) ms[i]; + ID oldID = m.getID(); + if (!excluding.equals(oldID) && !from.equals(oldID)) { + IAsynchConnection conn = (IAsynchConnection) m.getData(); + if (conn != null) { + try { + conn.sendAsynch(oldID, + getBytesForObject(new ContainerMessage( + from, oldID, + getNextSequenceNumber(), data + .getData()))); + } catch (IOException e) { + // XXX log this + } + } + } + } + } + } + protected void handleChangeMsg(ID fromID, ID toID, long seqNum, + Serializable data) throws IOException { + // Server should never receive change messages + } + + public void joinGroup(ID group, Object data) + throws SharedObjectContainerJoinException { + SharedObjectContainerJoinException e = new SharedObjectContainerJoinException( + "Server cannot join group " + group.getName()); + throw e; + } + public void leaveGroup() { + ejectAllGroupMembers(); + } + + protected ContainerMessage acceptNewClient(Socket socket, String target, + Serializable data, ISynchAsynchConnection conn) { + try { + ContainerMessage mess = (ContainerMessage) data; + if (mess == null) + throw new InvalidObjectException("container message is null"); + ID remoteID = mess.getFromContainerID(); + if (remoteID == null) + throw new InvalidObjectException("remote id is null"); + + ContainerMessage.JoinGroupMessage jgm = (ContainerMessage.JoinGroupMessage) mess + .getData(); + if (jgm == null) + throw new IOException("join group message is null"); + ID memberIDs[] = null; + + synchronized (getGroupMembershipLock()) { + if (isClosing) { + Exception e = new InvalidObjectException( + "container is closing"); + throw e; + } + + if (addNewRemoteMember(remoteID, conn)) { + // Notify existing remotes about new member + try { + forwardExcluding(getID(), remoteID, ContainerMessage + .makeViewChangeMessage(getID(), remoteID, + getNextSequenceNumber(), + new ID[] { remoteID }, true, null)); + } catch (IOException e) { + } + // Get current membership + memberIDs = groupManager.getMemberIDs(); + // Start messaging to new member + conn.start(); + } else { + ConnectException e = new ConnectException( + "server refused connection"); + throw e; + } + } + return ContainerMessage.makeViewChangeMessage(getID(), remoteID, + getNextSequenceNumber(), memberIDs, true, null); + } catch (Exception e) { + // XXX Log this + + // And then return null...which means refusal + return null; + } + } + + protected Serializable getConnectDataFromInput(Serializable input) + throws Exception { + return input; + } + + protected Object checkJoin(String hostname, ID id, Serializable data) + throws Exception { + return null; + } + + protected void memberLeave(ID leaveID, IAsynchConnection conn) { + if (removeRemoteMember(leaveID)) { + try { + forwardExcluding(getID(), leaveID, ContainerMessage + .makeViewChangeMessage(getID(), leaveID, + getNextSequenceNumber(), new ID[] { leaveID }, + false, null)); + } catch (IOException e) { + } + } + killConnection(conn); + } + + public void ejectGroupMember(ID memberID) { + IAsynchConnection conn = null; + synchronized (getGroupMembershipLock()) { + conn = getConnectionForID(memberID); + if (conn == null) + return; + try { + conn.sendAsynch( + + memberID, getBytesForObject(ContainerMessage + .makeLeaveGroupMessage(getID(), memberID, + getNextSequenceNumber(), null))); + } catch (Exception e) { + } + memberLeave(memberID, conn); + } + } + + public void ejectAllGroupMembers() { + synchronized (getGroupMembershipLock()) { + Object[] members = groupManager.getMembers(); + for (int i = 0; i < members.length; i++) { + ejectGroupMember(((Member) members[i]).getID()); + } + } + } + + // Support methods + protected ID getIDForConnection(IAsynchConnection conn) { + Object ms[] = groupManager.getMembers(); + for (int i = 0; i < ms.length; i++) { + Member m = (Member) ms[i]; + if (conn == (IAsynchConnection) m.getData()) + return m.getID(); + } + return null; + } + protected IAsynchConnection getConnectionForID(ID memberID) { + Member mem = groupManager.getMemberForID(memberID); + if (mem == null) + return null; + return (IAsynchConnection) mem.getData(); + } + + private final void queueToAll(ContainerMessage message) { + Object[] members = groupManager.getMembers(); + for (int i = 0; i < members.length; i++) { + IAsynchConnection conn = (IAsynchConnection) ((Member) members[i]) + .getData(); + if (conn != null) { + try { + conn.sendAsynch(message.getToContainerID(), + getBytesForObject(message)); + } catch (IOException e) { + // XXX report + } + } + } + } + + public void dispose(long timeout) { + // For servers, we'll eject all members + ejectAllGroupMembers(); + super.dispose(timeout); + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java new file mode 100644 index 000000000..597cdb0ae --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java @@ -0,0 +1,43 @@ +package org.eclipse.ecf.provider.generic; + +import org.eclipse.ecf.core.ISharedObjectContainerConfig; +import org.eclipse.ecf.core.comm.ConnectionFactory; +import org.eclipse.ecf.core.comm.ConnectionInstantiationException; +import org.eclipse.ecf.core.comm.ISynchAsynchConnection; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.identity.IDFactory; + +public class TCPClientSOContainer extends ClientSOContainer { + int keepAlive = 0; + + public static final String DEFAULT_COMM_NAME = "tcpclient"; + + public TCPClientSOContainer(ISharedObjectContainerConfig config) { + super(config); + } + + public TCPClientSOContainer(ISharedObjectContainerConfig config, int ka) { + super(config); + keepAlive = ka; + } + + protected ISynchAsynchConnection getClientConnection(ID remoteSpace, + Object data) throws ConnectionInstantiationException { + + Object[] args = { new Integer(keepAlive) }; + ISynchAsynchConnection conn = null; + conn = ConnectionFactory.makeSynchAsynchConnection(receiver, DEFAULT_COMM_NAME, args); + return conn; + } + + public static final void main(String[] args) throws Exception { + ISharedObjectContainerConfig config = new SOContainerConfig(IDFactory + .makeGUID()); + TCPClientSOContainer container = new TCPClientSOContainer(config); + // now join group + ID serverID = IDFactory.makeStringID(TCPServerSOContainer + .getDefaultServerURL()); + container.joinGroup(serverID, null); + Thread.sleep(200000); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java new file mode 100644 index 000000000..586a9d1b9 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainer.java @@ -0,0 +1,97 @@ +package org.eclipse.ecf.provider.generic; + +import java.io.IOException; +import java.io.Serializable; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; + +import org.eclipse.ecf.core.ISharedObjectContainerConfig; +import org.eclipse.ecf.core.comm.ConnectionRequestHandler; +import org.eclipse.ecf.core.comm.ISynchAsynchConnection; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.identity.IDFactory; + +public class TCPServerSOContainer extends ServerSOContainer implements + ConnectionRequestHandler { + + public static final String DEFAULT_PROTOCOL = "ecftcp"; + public static final int DEFAULT_PORT = 3282; + public static final int DEFAULT_KEEPALIVE = 30000; + public static final String DEFAULT_NAME = "server"; + // Keep alive value + protected int keepAlive; + protected TCPServerSOContainerGroup group; + + protected int getKeepAlive() { + return keepAlive; + } + public static String getServerURL(String host, String name) { + return DEFAULT_PROTOCOL + "://" + host + ":" + DEFAULT_PORT + "/" + + name; + } + public static String getDefaultServerURL() { + return getServerURL("localhost", DEFAULT_NAME); + } + public TCPServerSOContainer(ISharedObjectContainerConfig config, + TCPServerSOContainerGroup grp, int keepAlive) throws IOException, + URISyntaxException { + super(config); + this.keepAlive = keepAlive; + // Make sure URI syntax is followed. + URI aURI = new URI(config.getID().getName()); + int urlPort = aURI.getPort(); + if (group == null) { + this.group = new TCPServerSOContainerGroup(urlPort); + this.group.putOnTheAir(); + } else + this.group = grp; + String path = aURI.getPath(); + group.add(path, this); + } + + public TCPServerSOContainer(ISharedObjectContainerConfig config, + TCPServerSOContainerGroup listener, String path, int keepAlive) { + super(config); + initialize(listener, path, keepAlive); + } + protected void initialize(TCPServerSOContainerGroup listener, String path, + int keepAlive) { + this.keepAlive = keepAlive; + this.group = listener; + this.group.add(path, this); + } + public void dispose(long timeout) { + URI aURI = null; + try { + aURI = new URI(getID().getName()); + } catch (Exception e) { + // Should never happen + } + if (aURI != null) + group.remove(aURI.getPath()); + group = null; + super.dispose(timeout); + } + public TCPServerSOContainer(ISharedObjectContainerConfig config) + throws IOException, URISyntaxException { + this(config, null, DEFAULT_PORT); + } + public Serializable checkConnect(Socket socket, String target, + Serializable data, ISynchAsynchConnection conn) { + return acceptNewClient(socket, target, data, conn); + } + + protected Serializable getConnectDataFromInput(Serializable input) + throws Exception { + return input; + } + + public static void main(String[] args) throws Exception { + ID server = IDFactory.makeStringID(getDefaultServerURL()); + TCPServerSOContainer cont = new TCPServerSOContainer( + new SOContainerConfig(server)); + + Thread.sleep(3000000); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java new file mode 100644 index 000000000..865ff914f --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java @@ -0,0 +1,140 @@ +package org.eclipse.ecf.provider.generic; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.net.Socket; +import java.net.URI; + +import org.eclipse.ecf.provider.Debug; +import org.eclipse.ecf.core.comm.ConnectionRequestHandler; +import org.eclipse.ecf.provider.comm.tcp.Client; +import org.eclipse.ecf.provider.comm.tcp.ConnectRequestMessage; +import org.eclipse.ecf.provider.comm.tcp.ConnectResultMessage; +import org.eclipse.ecf.provider.comm.tcp.ExObjectInputStream; +import org.eclipse.ecf.provider.comm.tcp.ExObjectOutputStream; +import org.eclipse.ecf.provider.comm.tcp.ISocketAcceptHandler; +import org.eclipse.ecf.provider.comm.tcp.Server; + +public class TCPServerSOContainerGroup extends SOContainerGroup implements + ISocketAcceptHandler { + + public static final String INVALID_CONNECT = "Invalid connect request. "; + + public static final Debug debug = Debug + .create(TCPServerSOContainerGroup.class.getName()); + public static final String DEFAULT_GROUP_NAME = TCPServerSOContainerGroup.class + .getName(); + + protected int port; + Server listener; + boolean isOnTheAir = false; + ThreadGroup threadGroup; + + public TCPServerSOContainerGroup(String name, ThreadGroup group, int port) { + super(name); + threadGroup = group; + this.port = port; + } + + public TCPServerSOContainerGroup(String name, int port) { + this(name, null, port); + } + + public TCPServerSOContainerGroup(int port) { + this(DEFAULT_GROUP_NAME, null, port); + } + + public synchronized void putOnTheAir() throws IOException { + if (Debug.ON && debug != null) { + debug.msg("Putting group " + this + " on the air."); + } + listener = new Server(threadGroup, port, this); + port = listener.getLocalPort(); + isOnTheAir = true; + } + + public synchronized boolean isOnTheAir() { + return isOnTheAir; + } + + public void handleAccept(Socket aSocket) throws Exception { + ObjectOutputStream oStream = new ExObjectOutputStream( + new BufferedOutputStream(aSocket.getOutputStream())); + oStream.flush(); + + ObjectInputStream iStream = new ExObjectInputStream(aSocket + .getInputStream()); + + ConnectRequestMessage req = (ConnectRequestMessage) iStream + .readObject(); + if (Debug.ON && debug != null) { + debug.msg("Got connect request " + req); + } + if (req == null) + throw new InvalidObjectException(INVALID_CONNECT + + "ConnectRequestMessage is null"); + + URI uri = req.getTarget(); + if (uri == null) + throw new InvalidObjectException(INVALID_CONNECT + + "Target URI is null"); + String path = uri.getPath(); + if (path == null) + throw new InvalidObjectException(INVALID_CONNECT + + "Target path is null"); + + TCPServerSOContainer srs = (TCPServerSOContainer) get(path); + if (Debug.ON && debug != null) { + debug.msg("Found container with " + srs.getID().getName() + + " for target " + uri); + } + if (srs == null) + throw new InvalidObjectException("Container for target " + path + + " not found!"); + + // Create our local messaging interface + Client newClient = new Client(aSocket, iStream, oStream, srs + .getReceiver(), srs.keepAlive); + + // No other threads can access messaging interface until space has + // accepted/rejected + // connect request + synchronized (newClient) { + // Call checkConnect + Serializable resp = (Serializable) ((ConnectionRequestHandler) srs) + .checkConnect(aSocket, path, req.getData(), newClient); + // Create connect response wrapper and send it back + oStream.writeObject(new ConnectResultMessage(resp)); + oStream.flush(); + } + } + + public synchronized void takeOffTheAir() { + if (listener != null) { + if (Debug.ON && debug != null) { + debug.msg("Taking " + getName() + " on the air."); + } + try { + listener.close(); + } catch (IOException e) { + if (Debug.ON && debug != null) { + debug.dumpStack(e, "Exception in closeListener"); + } + } + listener = null; + } + isOnTheAir = false; + } + + public int getPort() { + return port; + } + + public String toString() { + return super.toString() + ";port:" + port; + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java new file mode 100644 index 000000000..816828aa4 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/events/SharedObjectCallEvent.java @@ -0,0 +1,42 @@ +package org.eclipse.ecf.provider.generic.events; + +import org.eclipse.ecf.core.events.ISharedObjectCallEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.AsynchResult; +import org.eclipse.ecf.core.util.Event; + +public class SharedObjectCallEvent implements ISharedObjectCallEvent { + + ID sender; + Event event; + AsynchResult result; + + public SharedObjectCallEvent(ID sender, Event evt, AsynchResult res) { + super(); + this.sender = sender; + this.event = evt; + this.result = res; + } + + /* (non-Javadoc) + * @see org.eclipse.ecf.core.events.ISharedObjectCallEvent#getAsynchResult() + */ + public AsynchResult getAsynchResult() { + return result; + } + + /* (non-Javadoc) + * @see org.eclipse.ecf.core.events.ISharedObjectEvent#getSenderSharedObjectID() + */ + public ID getSenderSharedObjectID() { + return sender; + } + + /* (non-Javadoc) + * @see org.eclipse.ecf.core.events.ISharedObjectEvent#getEvent() + */ + public Event getEvent() { + return event; + } + +} diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java new file mode 100644 index 000000000..48fd07a23 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/GMMImpl.java @@ -0,0 +1,80 @@ +package org.eclipse.ecf.provider.generic.gmm; + +import java.util.Observable; +import java.util.TreeSet; +import java.util.Iterator; + +import org.eclipse.ecf.core.identity.ID; + +public class GMMImpl extends Observable { + + TreeSet mySet; + + public GMMImpl() { + mySet = new TreeSet(); + } + + public boolean addMember(Member m) { + boolean res = mySet.add(m); + if (res) { + setChanged(); + notifyObservers(new MemberChanged(m, true)); + } + return res; + } + + public boolean removeMember(Member m) { + boolean res = mySet.remove(m); + if (res) { + setChanged(); + notifyObservers(new MemberChanged(m, false)); + } + return res; + } + + public void removeAllMembers() { + Object members[] = getMembers(); + for (int i = 0; i < members.length; i++) { + removeMember((Member) members[i]); + } + } + + public Object[] getMembers() { + return mySet.toArray(); + } + + public ID[] getMemberIDs(ID exclude) { + TreeSet newSet = null; + if (exclude != null) { + newSet = (TreeSet) mySet.clone(); + newSet.remove(new Member(exclude)); + } else { + newSet = mySet; + } + ID ids[] = new ID[newSet.size()]; + Iterator iter = newSet.iterator(); + int j = 0; + while (iter.hasNext()) { + ids[j++] = (ID) ((Member) iter.next()).getID(); + } + return ids; + } + + public int getSize() { + return mySet.size(); + } + + public boolean containsMember(Member m) { + return mySet.contains(m); + } + + public Iterator iterator() { + return mySet.iterator(); + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("GMM").append(mySet); + return sb.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java new file mode 100644 index 000000000..35dcd7656 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/Member.java @@ -0,0 +1,53 @@ +package org.eclipse.ecf.provider.generic.gmm; + +import org.eclipse.ecf.core.identity.ID; + +public class Member implements Comparable { + + ID member; + Object data; + + public Member(ID member) { + this(member, null); + } + + public Member(ID member, Object data) { + this.member = member; + this.data = data; + } + + public boolean equals(Object o) { + if (o != null && o instanceof Member) { + return member.equals(((Member) o).member); + } else + return false; + } + + public int hashCode() { + return member.hashCode(); + } + + public int compareTo(Object o) { + if (o != null && o instanceof Member) { + return member.compareTo(((Member) o).member); + } else + return 0; + } + + public ID getID() { + return member; + } + + public Object getData() { + return data; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("Member[").append(member).append(";").append( + data).append( + "]"); + return sb.toString(); + } + +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java new file mode 100644 index 000000000..3db3ed410 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/gmm/MemberChanged.java @@ -0,0 +1,19 @@ +package org.eclipse.ecf.provider.generic.gmm; + +public class MemberChanged { + Member member; + boolean added; + + public MemberChanged(Member member, boolean added) { + this.member = member; + this.added = added; + } + + public Member getMember() { + return member; + } + + public boolean getAdded() { + return added; + } +}
\ No newline at end of file |
