diff options
| author | slewis | 2004-12-30 08:07:30 +0000 |
|---|---|---|
| committer | slewis | 2004-12-30 08:07:30 +0000 |
| commit | 37daa716c6170cf095d9b547f9c3517dd81f465b (patch) | |
| tree | 13b05ebe14a37210a99e3332288e9ab28e40c9e1 | |
| parent | eb3e68909c683017264d237a446137a2e6ae8e31 (diff) | |
| download | org.eclipse.ecf-37daa716c6170cf095d9b547f9c3517dd81f465b.tar.gz org.eclipse.ecf-37daa716c6170cf095d9b547f9c3517dd81f465b.tar.xz org.eclipse.ecf-37daa716c6170cf095d9b547f9c3517dd81f465b.zip | |
Added tracing to org.eclipse.ecf.provider project (see .options file). Modified test cases. Added event class and interface
21 files changed, 1396 insertions, 968 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/.options b/framework/bundles/org.eclipse.ecf.provider/.options index 34aa5dee7..71d77b0c4 100644 --- a/framework/bundles/org.eclipse.ecf.provider/.options +++ b/framework/bundles/org.eclipse.ecf.provider/.options @@ -1,3 +1,7 @@ org.eclipse.ecf.provider/debug = true -org.eclipse.ecf.provider/debug/flag = true org.eclipse.ecf.provider/debug/filter = * +org.eclipse.ecf.provider/debug/flag = true +org.eclipse.ecf.provider/debug/connection = true +org.eclipse.ecf.provider/debug/container = true +org.eclipse.ecf.provider/debug/sharedobjectwrapper = true +org.eclipse.ecf.provider/debug/sharedobjectmanager = true
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java index c8fd8dc1d..8b49b1ee1 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java @@ -1,27 +1,61 @@ package org.eclipse.ecf.provider; +import java.text.SimpleDateFormat; +import java.util.Date; + import org.eclipse.core.runtime.Platform; public class Trace { - public static boolean ON = Platform.inDebugMode(); + public static boolean ON = false; + protected static boolean isEclipse = false; + protected static String pluginName = ""; + protected static String debugPrefix = "/debug/"; - public static Trace create(String key) { - return new Trace(key); + static { + try { + ON = Platform.inDebugMode(); + isEclipse = true; + pluginName = ProviderPlugin.getDefault().getBundle().getSymbolicName(); + } catch (Exception e) { + // No eclipse Platform available + } } - public static void errDumpStack(Throwable e, String msg) { - System.err.println(msg); - e.printStackTrace(System.err); + public static void setTrace(boolean on) { + ON = on; + } + + public static Trace create(String key) { + if (isEclipse) { + String res = Platform.getDebugOption(pluginName+debugPrefix+key); + if (res != null) { + Boolean on = new Boolean(res); + if (on.booleanValue()) return new Trace(pluginName+"("+key+")"); + else return null; + } else { + return null; + } + } else return new Trace(key); } + + String name; + public void dumpStack(Throwable e, String msg) { - System.err.println(msg); + msg(msg); e.printStackTrace(System.err); } public void msg(String msg) { - System.err.println(msg); + System.err.println(name+"["+getTimeString()+"]"+msg); + } + + protected static String getTimeString() { + Date d = new Date(); + SimpleDateFormat df = new SimpleDateFormat("MM/dd/yy;HH:mm:ss:SSS"); + return df.format(d); } protected Trace(String str) { + name = str; } public static void setThreadDebugGroup(Object obj) { // Do nothing diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java index b7d20717c..53adeb6fd 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/AsynchMessage.java @@ -15,4 +15,9 @@ public class AsynchMessage implements Serializable { Serializable getData() { return data; } + public String toString() { + StringBuffer buf = new StringBuffer("AsynchMessage["); + buf.append(data).append("]"); + return buf.toString(); + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java index 3ee916a65..d855d07c6 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java @@ -51,7 +51,7 @@ public final class Client implements ISynchAsynchConnection { } public static final String PROTOCOL = "ecftcp"; - public static final Trace debug = Trace.create(Client.class.getName()); + public static final Trace debug = Trace.create("connection"); public static final int SNDR_PRIORITY = Thread.NORM_PRIORITY; public static final int RCVR_PRIORITY = Thread.NORM_PRIORITY; @@ -90,6 +90,16 @@ public final class Client implements ISynchAsynchConnection { public Map getProperties() { return properties; } + protected void debug(String msg) { + if (Trace.ON && debug != null) { + debug.msg(msg); + } + } + protected void dumpStack(String msg, Throwable e) { + if (Trace.ON && debug != null) { + debug.dumpStack(e,msg); + } + } public void setProperties(Map props) { this.properties = props; } @@ -180,6 +190,9 @@ public final class Client implements ISynchAsynchConnection { public synchronized Object connect(ID remote, Object data, int timeout) throws IOException { + + debug("connect("+remote+","+data+","+timeout+")"); + if (socket != null) throw new ConnectException("Client already connected"); @@ -197,10 +210,8 @@ public final class Client implements ISynchAsynchConnection { SocketFactory fact = SocketFactory.getSocketFactory(); if (fact == null) { fact = SocketFactory.getDefaultSocketFactory(); - } - if (Trace.ON && debug != null) { - debug.msg("Connecting to " + address + ":" + port); - } + } + debug("socket connecting to "+address+":"+port); // Actually connect to remote using socket from socket factory. socket = fact.createSocket(address, port, timeout); // Set TCP no delay @@ -217,6 +228,7 @@ public final class Client implements ISynchAsynchConnection { ConnectResultMessage res = null; res = (ConnectResultMessage) readObject(); + debug("recv:"+address+":"+port+":"+res); // Setup threads setupThreads(); // Return results. @@ -265,10 +277,7 @@ public final class Client implements ISynchAsynchConnection { } else msgCount++; } catch (IOException e) { - // Log to stderr - Trace.errDumpStack(e, "Exception in sender thread for " - + address + ":" + port); - + //dumpStack("read",e); if (isClosing) { isClosing = false; synchronized (Client.this) { @@ -285,19 +294,15 @@ public final class Client implements ISynchAsynchConnection { break; } } - if (Trace.ON && debug != null) { - debug.msg("Sndr for " + address + ":" + port - + " terminating."); - } + debug("sender:"+address+":"+port+" terminating"); } - }, "Sndr for " + address + ":" + port); + }, "sndr:" + address + ":" + port); // Set priority for new thread aThread.setPriority(SNDR_PRIORITY); return aThread; } private void closeSocket() { - // Close socket try { if (socket != null) socket.close(); @@ -307,6 +312,7 @@ public final class Client implements ISynchAsynchConnection { private void sendIt(Serializable snd) throws IOException { // Write object to output stream + debug("send:"+address+":"+port+":"+snd); synchronized (outputStream) { outputStream.writeObject(snd); outputStream.flush(); @@ -336,8 +342,8 @@ public final class Client implements ISynchAsynchConnection { } } // Before returning, actually remove remote objects - handler.handleDisconnectEvent(new DisconnectConnectionEvent( - Client.this, null, queue)); + //handler.handleDisconnectEvent(new DisconnectConnectionEvent( + //Client.this, null, queue)); } private Thread getRcvThread() { @@ -351,15 +357,7 @@ public final class Client implements ISynchAsynchConnection { try { handleRcv(readObject()); } catch (IOException e) { - // Log to stderr - Trace.errDumpStack(e, "Exception in read thread for " - + address + ":" + port); - - if (Trace.ON && debug != null) { - debug.dumpStack(e, "Exception in read thread for " - + address + ":" + port + ": " - + e.getMessage()); - } + //dumpStack("read",e); if (isClosing) { isClosing = false; synchronized (Client.this) { @@ -376,12 +374,9 @@ public final class Client implements ISynchAsynchConnection { break; } } - if (Trace.ON && debug != null) { - debug.msg("Rcvr for " + address + ":" + port - + " terminating."); - } + debug("read:"+address+":"+port+" terminating"); } - }, "Rcvr for " + address + ":" + port); + }, "rcvr:" + address + ":" + port); // Set priority and return aThread.setPriority(RCVR_PRIORITY); return aThread; @@ -391,6 +386,7 @@ public final class Client implements ISynchAsynchConnection { try { // We've received some data, so the connection is alive receiveResp(); + debug("recv:"+address+":"+port+":"+rcv); // Handle all messages if (rcv instanceof SynchMessage) { // Handle synch message. The only valid synch message is @@ -407,7 +403,7 @@ public final class Client implements ISynchAsynchConnection { } else if (rcv instanceof PingResponseMessage) { // Do nothing with ping response } else - throw new IOException("Invalid packet received."); + throw new IOException("Invalid message received."); } catch (IOException e) { disconnect(); throw e; @@ -415,6 +411,7 @@ public final class Client implements ISynchAsynchConnection { } public synchronized void start() { + debug("start("+address+":"+port+")"); if (sendThread != null) sendThread.start(); if (rcvThread != null) @@ -463,13 +460,7 @@ public final class Client implements ISynchAsynchConnection { } } } catch (Exception e) { - // Log to stderr - Trace.errDumpStack(e, "Exception in ping thread for " - + address + ":" + port); - - if (Trace.ON && debug != null) { - debug.dumpStack(e, "Exception in ping."); - } + //dumpStack("ping",e); if (isClosing) { isClosing = false; synchronized (Client.this) { @@ -486,17 +477,13 @@ public final class Client implements ISynchAsynchConnection { break; } } - if (Trace.ON && debug != null) { - debug.msg("Keepalive terminating."); - } + debug("ping:"+address+":"+port+" terminating"); } - }, "Keepalive " + address + ":" + port); + }, "ping:" + address + ":" + port); } public synchronized void disconnect() throws IOException { - if (Trace.ON && debug != null) { - debug.msg("disconnect()"); - } + debug("disconnect("+address+":"+port+")"); // Close send queue and socket queue.close(); closeSocket(); @@ -521,8 +508,8 @@ public final class Client implements ISynchAsynchConnection { public void sendAsynch(ID recipient, byte[] obj) throws IOException { queueObject(recipient, obj); } - public void sendAsynch(ID recipien, Object obj) throws IOException { - queueObject(recipien, (Serializable) obj); + public void sendAsynch(ID recipient, Object obj) throws IOException { + queueObject(recipient, (Serializable) obj); } public synchronized void queueObject(ID recipient, Serializable obj) throws IOException { @@ -550,9 +537,7 @@ public final class Client implements ISynchAsynchConnection { try { ret = (Serializable) inputStream.readObject(); } catch (ClassNotFoundException e) { - if (Trace.ON && debug != null) { - debug.dumpStack(e, "Class not found exception"); - } + dumpStack("ClassNotFoundException",e); throw new IOException( "Protocol violation due to class load failure. " + e.getMessage()); diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java index 4b3b548a1..293946746 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectRequestMessage.java @@ -22,6 +22,8 @@ public class ConnectRequestMessage implements Serializable { } public String toString() { - return "ConnectRequestMessage[target:" + target + ";data:" + data + "]"; + StringBuffer buf = new StringBuffer("ConnectRequestMessage["); + buf.append(target).append(";").append(data).append("]"); + return buf.toString(); } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java index 56ee36524..08572d311 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ConnectResultMessage.java @@ -13,4 +13,10 @@ public class ConnectResultMessage implements Serializable { public Serializable getData() { return data; } + + public String toString() { + StringBuffer buf = new StringBuffer("ConnectResultMessage["); + buf.append(data).append("]"); + return buf.toString(); + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java index 621de79bd..206706b17 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectInputStream.java @@ -10,29 +10,21 @@ public class ExObjectInputStream extends ObjectInputStream { private boolean replace = false; - public static final Trace debug = Trace.create(ExObjectInputStream.class - .getName()); + public static final Trace debug = Trace.create("connection"); public ExObjectInputStream(InputStream in) throws IOException, SecurityException { super(in); - if (Trace.ON && debug != null) { - debug.msg("ExObjectInputStream(" + in + ")"); - } } public ExObjectInputStream(InputStream in, boolean backwardCompatibility) throws IOException, SecurityException { super(in); - if (Trace.ON && debug != null) { - debug.msg("ExObjectInputStream(" + in + "," + backwardCompatibility - + ")"); - } if (backwardCompatibility) { try { super.enableResolveObject(true); replace = true; - debug("ExObjectInputStream.compatibility set"); + debug("resolveObject"); } catch (Exception e) { throw new IOException( "Could not setup backward compatibility object replacers for ExObjectInputStream"); @@ -43,12 +35,11 @@ public class ExObjectInputStream extends ObjectInputStream { protected void debug(String msg) { if (Trace.ON && debug != null) { debug.msg(msg); - } + } } - protected void debug(String msg, Throwable t) { + protected void dumpStack(String msg, Throwable e) { if (Trace.ON && debug != null) { - debug.dumpStack(t, msg); - } + debug.dumpStack(e,msg); + } } - }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java index f71c6e249..c2be430e0 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/ExObjectOutputStream.java @@ -10,12 +10,10 @@ public class ExObjectOutputStream extends ObjectOutputStream { private boolean replace = false; - public static final Trace debug = Trace.create(ExObjectOutputStream.class - .getName()); + public static final Trace debug = Trace.create("connection"); public ExObjectOutputStream(OutputStream out) throws IOException { super(out); - debug("ExObjectOutputStream(" + out + ")"); } public ExObjectOutputStream(OutputStream out, boolean backwardCompatibility) @@ -25,7 +23,7 @@ public class ExObjectOutputStream extends ObjectOutputStream { try { super.enableReplaceObject(true); replace = true; - debug("ExObjectOutputStream.compatibility set"); + debug("replaceObject"); } catch (Exception e) { throw new IOException( "Could not setup backward compatibility object replacers for ExObjectOutputStream"); @@ -35,11 +33,11 @@ public class ExObjectOutputStream extends ObjectOutputStream { protected void debug(String msg) { if (Trace.ON && debug != null) { debug.msg(msg); - } + } } - protected void debug(String msg, Throwable t) { + protected void dumpStack(String msg, Throwable e) { if (Trace.ON && debug != null) { - debug.dumpStack(t, msg); - } + debug.dumpStack(e,msg); + } } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java index f17b7ebd2..822b48d73 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingMessage.java @@ -5,4 +5,8 @@ import java.io.Serializable; public class PingMessage implements Serializable { protected PingMessage() { } + public String toString() { + StringBuffer buf = new StringBuffer("PingMessage"); + return buf.toString(); + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java index d262b1fb3..49290a5ec 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/PingResponseMessage.java @@ -5,4 +5,8 @@ import java.io.Serializable; public class PingResponseMessage implements Serializable { protected PingResponseMessage() { } + public String toString() { + StringBuffer buf = new StringBuffer("PingResponseMessage"); + return buf.toString(); + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java index 590788f39..624ee2744 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Server.java @@ -7,13 +7,23 @@ import java.net.Socket; import org.eclipse.ecf.provider.Trace; public class Server extends ServerSocket { - public static Trace debug = Trace.create(Server.class.getName()); + public static Trace debug = Trace.create("connection"); ISocketAcceptHandler acceptHandler; Thread listenerThread; ThreadGroup threadGroup; + protected void debug(String msg) { + if (Trace.ON && debug != null) { + debug.msg(msg); + } + } + protected void dumpStack(String msg, Throwable e) { + if (Trace.ON && debug != null) { + debug.dumpStack(e,msg); + } + } public Server(ThreadGroup group, int port, ISocketAcceptHandler handler) throws IOException { super(port); @@ -55,30 +65,21 @@ public class Server extends ServerSocket { new Thread(threadGroup, new Runnable() { public void run() { try { + debug("accept:"+aSocket.getInetAddress()); acceptHandler.handleAccept(aSocket); } catch (Exception e) { - if (Trace.ON && debug != null) { - debug.dumpStack(e, - "Unexplained exception in connect. Closing."); - } + dumpStack("Unexpected exception in handleAccept...closing",e); try { aSocket.close(); } catch (IOException e1) { } - ; } finally { - if (Trace.ON && debug != null) { - debug.msg("handleAcceptAsych terminating."); - } } } }).start(); } public synchronized void close() throws IOException { - if (Trace.ON && debug != null) { - debug.msg("close()"); - } super.close(); if (listenerThread != null) { listenerThread.interrupt(); diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java index e7c6d8872..ad6e7d0be 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/SynchMessage.java @@ -10,4 +10,9 @@ public class SynchMessage extends AsynchMessage { protected SynchMessage() { super(); } + public String toString() { + StringBuffer buf = new StringBuffer("SynchMessage["); + buf.append(data).append("]"); + return buf.toString(); + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java index 125a6ae2a..73caaa0c8 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ClientSOContainer.java @@ -61,6 +61,7 @@ public abstract class ClientSOContainer extends SOContainer { try { if (isClosing) throw new IllegalStateException("container is closing"); + debug("joingroup:"+remote+":"+data); ISynchAsynchConnection aConnection = getClientConnection( remote, data); if (aConnection == null) { @@ -137,6 +138,10 @@ public abstract class ClientSOContainer extends SOContainer { } } + protected void handleViewChangeMessage(ContainerMessage mess) throws IOException { + // XXX TODO + } + protected void forwardExcluding(ID from, ID excluding, ContainerMessage data) throws IOException { // NOP @@ -149,6 +154,7 @@ public abstract class ClientSOContainer extends SOContainer { return null; } public void leaveGroup() { + debug("leaveGroup"); synchronized (connectLock) { // If we are currently connected if (isConnected()) { diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java index 3728b67e0..5b9e4c6f3 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ContainerMessage.java @@ -5,162 +5,296 @@ import org.eclipse.ecf.core.identity.ID; public class ContainerMessage implements Serializable { - ID fromContainerID; - public ID toContainerID; - long sequence; - Serializable data; - - /** - * @return Returns the data. - */ - public Serializable getData() { - return data; - } - /** - * @param data - * The data to set. - */ - public void setData(Serializable data) { - this.data = data; - } - /** - * @return Returns the fromContainerID. - */ - public ID getFromContainerID() { - return fromContainerID; - } - /** - * @param fromContainerID - * The fromContainerID to set. - */ - public void setFromContainerID(ID fromContainerID) { - this.fromContainerID = fromContainerID; - } - /** - * @return Returns the sequence. - */ - public long getSequence() { - return sequence; - } - /** - * @param sequence - * The sequence to set. - */ - public void setSequence(long sequence) { - this.sequence = sequence; - } - /** - * @return Returns the toContainerID. - */ - public ID getToContainerID() { - return toContainerID; - } - /** - * @param toContainerID - * The toContainerID to set. - */ - public void setToContainerID(ID toContainerID) { - this.toContainerID = toContainerID; - } - static ContainerMessage makeViewChangeMessage(ID from, ID to, long seq, - ID ids[], boolean add, Serializable data) { - return new ContainerMessage(from, to, seq, new ViewChangeMessage(ids, - add, data)); - } - static ContainerMessage makeJoinGroupMessage(ID from, ID to, long seq, - Serializable data) { - return new ContainerMessage(from, to, seq, new JoinGroupMessage(data)); - } - static ContainerMessage makeLeaveGroupMessage(ID from, ID to, long seq, - Serializable data) { - return new ContainerMessage(from, to, seq, new LeaveGroupMessage(data)); - } - static ContainerMessage makeSharedObjectCreateMessage(ID from, ID to, - long seq, Serializable data) { - return new ContainerMessage(from, to, seq, new CreateMessage(data)); - } - static ContainerMessage makeSharedObjectCreateResponseMessage(ID from, - ID to, long contSeq, ID soID, Throwable e, long sequence) { - return new ContainerMessage(from, to, contSeq, - new CreateResponseMessage(soID, e, sequence)); - } - static ContainerMessage makeSharedObjectMessage(ID from, ID to, long seq, - ID fromSharedObject, Serializable data) { - return new ContainerMessage(from, to, seq, new SharedObjectMessage( - fromSharedObject, data)); - } - static ContainerMessage makeSharedObjectDisposeMessage(ID from, ID to, - long seq, ID sharedObjectID) { - return new ContainerMessage(from, to, seq, - new SharedObjectDisposeMessage(sharedObjectID)); - } - protected ContainerMessage(ID from, ID to, long seq, Serializable data) { - this.fromContainerID = from; - this.toContainerID = to; - this.sequence = seq; - this.data = data; - } - - public static final class ViewChangeMessage implements Serializable { - ID changeIDs[]; - boolean add; - Serializable data; - ViewChangeMessage(ID id[], boolean a, Serializable data) { - this.changeIDs = id; - this.add = a; - this.data = data; - } - } - public static final class CreateMessage implements Serializable { - Serializable data; - CreateMessage(Serializable data) { - this.data = data; - } - } - public static final class CreateResponseMessage implements Serializable { - ID sharedObjectID; - Throwable exception; - long sequence; - public CreateResponseMessage(ID objID, Throwable except, long sequence) { - this.sharedObjectID = objID; - this.exception = except; - this.sequence = sequence; - } - } - public static final class SharedObjectMessage implements Serializable { - Serializable data; - ID fromSharedObjectID; - SharedObjectMessage(ID fromSharedObject, Serializable data) { - this.fromSharedObjectID = fromSharedObject; - this.data = data; - } - } - public static final class SharedObjectDisposeMessage implements - Serializable { - ID sharedObjectID; - SharedObjectDisposeMessage(ID objID) { - this.sharedObjectID = objID; - } - } - - public static final class JoinGroupMessage implements Serializable { - Serializable data; - - public JoinGroupMessage(Serializable data) { - this.data = data; - } - public Serializable getData() { - return data; - } - } - public static final class LeaveGroupMessage implements Serializable { - Serializable data; - - public LeaveGroupMessage(Serializable data) { - this.data = data; - } - public Serializable getData() { - return data; - } - } + ID fromContainerID; + + public ID toContainerID; + + long sequence; + + Serializable data; + + /** + * @return Returns the data. + */ + public Serializable getData() { + return data; + } + + /** + * @param data + * The data to set. + */ + public void setData(Serializable data) { + this.data = data; + } + + /** + * @return Returns the fromContainerID. + */ + public ID getFromContainerID() { + return fromContainerID; + } + + /** + * @param fromContainerID + * The fromContainerID to set. + */ + public void setFromContainerID(ID fromContainerID) { + this.fromContainerID = fromContainerID; + } + + /** + * @return Returns the sequence. + */ + public long getSequence() { + return sequence; + } + + /** + * @param sequence + * The sequence to set. + */ + public void setSequence(long sequence) { + this.sequence = sequence; + } + + /** + * @return Returns the toContainerID. + */ + public ID getToContainerID() { + return toContainerID; + } + + /** + * @param toContainerID + * The toContainerID to set. + */ + public void setToContainerID(ID toContainerID) { + this.toContainerID = toContainerID; + } + + static ContainerMessage makeViewChangeMessage(ID from, ID to, long seq, + ID ids[], boolean add, Serializable data) { + return new ContainerMessage(from, to, seq, new ViewChangeMessage(ids, + add, data)); + } + + static ContainerMessage makeJoinGroupMessage(ID from, ID to, long seq, + Serializable data) { + return new ContainerMessage(from, to, seq, new JoinGroupMessage(data)); + } + + static ContainerMessage makeLeaveGroupMessage(ID from, ID to, long seq, + Serializable data) { + return new ContainerMessage(from, to, seq, new LeaveGroupMessage(data)); + } + + static ContainerMessage makeSharedObjectCreateMessage(ID from, ID to, + long seq, Serializable data) { + return new ContainerMessage(from, to, seq, new CreateMessage(data)); + } + + static ContainerMessage makeSharedObjectCreateResponseMessage(ID from, + ID to, long contSeq, ID soID, Throwable e, long sequence) { + return new ContainerMessage(from, to, contSeq, + new CreateResponseMessage(soID, e, sequence)); + } + + static ContainerMessage makeSharedObjectMessage(ID from, ID to, long seq, + ID fromSharedObject, Serializable data) { + return new ContainerMessage(from, to, seq, new SharedObjectMessage( + fromSharedObject, data)); + } + + static ContainerMessage makeSharedObjectDisposeMessage(ID from, ID to, + long seq, ID sharedObjectID) { + return new ContainerMessage(from, to, seq, + new SharedObjectDisposeMessage(sharedObjectID)); + } + + protected ContainerMessage(ID from, ID to, long seq, Serializable data) { + this.fromContainerID = from; + this.toContainerID = to; + this.sequence = seq; + this.data = data; + } + + public String toString() { + StringBuffer sb = new StringBuffer("ContainerMessage["); + sb.append(fromContainerID).append(";").append(toContainerID) + .append(";"); + sb.append(sequence).append(";").append(data).append("]"); + return sb.toString(); + } + + public static final class ViewChangeMessage implements Serializable { + ID changeIDs[]; + + boolean add; + + Serializable data; + + ViewChangeMessage(ID id[], boolean a, Serializable data) { + this.changeIDs = id; + this.add = a; + this.data = data; + } + + protected String printChangeIDs() { + if (changeIDs == null) + return "null"; + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < changeIDs.length; i++) { + buf.append(changeIDs[i]); + if (i != (changeIDs.length - 1)) + buf.append(","); + } + return buf.toString(); + } + + public String toString() { + StringBuffer sb = new StringBuffer("ViewChangeMessage["); + sb.append(printChangeIDs()).append(";").append(add).append(";").append(data) + .append("]"); + return sb.toString(); + } + } + + public static final class CreateMessage implements Serializable { + Serializable data; + + CreateMessage(Serializable data) { + this.data = data; + } + public Serializable getData() { + return data; + } + public String toString() { + StringBuffer sb = new StringBuffer("CreateMessage["); + sb.append(data).append("]"); + return sb.toString(); + } + } + + public static final class CreateResponseMessage implements Serializable { + ID sharedObjectID; + + Throwable exception; + + long sequence; + + public CreateResponseMessage(ID objID, Throwable except, long sequence) { + this.sharedObjectID = objID; + this.exception = except; + this.sequence = sequence; + } + + public String toString() { + StringBuffer sb = new StringBuffer("CreateResponseMessage["); + sb.append(sharedObjectID).append(";").append(exception).append(";") + .append(sequence).append("]"); + return sb.toString(); + } + /** + * @return Returns the exception. + */ + public Throwable getException() { + return exception; + } + /** + * @return Returns the sequence. + */ + public long getSequence() { + return sequence; + } + /** + * @return Returns the sharedObjectID. + */ + public ID getSharedObjectID() { + return sharedObjectID; + } + } + + public static final class SharedObjectMessage implements Serializable { + Serializable data; + + ID fromSharedObjectID; + + SharedObjectMessage(ID fromSharedObject, Serializable data) { + this.fromSharedObjectID = fromSharedObject; + this.data = data; + } + + public String toString() { + StringBuffer sb = new StringBuffer("SharedObjectMessage["); + sb.append(fromSharedObjectID).append(";").append(data).append("]"); + return sb.toString(); + } + /** + * @return Returns the data. + */ + public Serializable getData() { + return data; + } + /** + * @return Returns the fromSharedObjectID. + */ + public ID getFromSharedObjectID() { + return fromSharedObjectID; + } + } + + public static final class SharedObjectDisposeMessage implements + Serializable { + ID sharedObjectID; + + SharedObjectDisposeMessage(ID objID) { + this.sharedObjectID = objID; + } + public String toString() { + StringBuffer sb = new StringBuffer("SharedObjectDisposeMessage["); + sb.append(sharedObjectID).append("]"); + return sb.toString(); + } + /** + * @return Returns the sharedObjectID. + */ + public ID getSharedObjectID() { + return sharedObjectID; + } + } + + public static final class JoinGroupMessage implements Serializable { + Serializable data; + + public JoinGroupMessage(Serializable data) { + this.data = data; + } + + public Serializable getData() { + return data; + } + public String toString() { + StringBuffer sb = new StringBuffer("JoinGroupMessage["); + sb.append(data).append("]"); + return sb.toString(); + } + } + + public static final class LeaveGroupMessage implements Serializable { + Serializable data; + + public LeaveGroupMessage(Serializable data) { + this.data = data; + } + + public Serializable getData() { + return data; + } + public String toString() { + StringBuffer sb = new StringBuffer("LeaveGroupMessage["); + sb.append(data).append("]"); + return sb.toString(); + } + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java index 85ad8b1c1..9aad15073 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java @@ -4,9 +4,11 @@ */ package org.eclipse.ecf.provider.generic; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.NotSerializableException; +import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.security.AccessController; @@ -36,639 +38,883 @@ import org.eclipse.ecf.core.comm.SynchConnectionEvent; import org.eclipse.ecf.core.events.IContainerEvent; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.provider.Trace; import org.eclipse.ecf.provider.generic.gmm.Member; public abstract class SOContainer implements ISharedObjectContainer { - public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class - .getName() - + ".sharedobjectargs"; - public static final String DEFAULT_OBJECT_ARGTYPES_KEY = SOContainer.class - .getName() - + ".sharedobjectargs"; - - private long sequenceNumber = 0L; - private Vector listeners = null; - - protected ISharedObjectContainerConfig config = null; - protected SOContainerGMM groupManager = null; - protected ThreadGroup loadingThreadGroup = null; - protected ThreadGroup sharedObjectThreadGroup = null; - protected SOManager sharedObjectManager = null; - protected boolean isClosing = false; - protected MessageReceiver receiver; - - protected byte[] getBytesForObject(Serializable obj) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(obj); - return bos.toByteArray(); - } - - protected void killConnection(IConnection conn) { - try { - if (conn != null) - conn.disconnect(); - } catch (IOException e) { - // XXX report - } - } - protected void memberLeave(ID target, IConnection conn) { - if (conn != null) - killConnection(conn); - } - protected void fireContainerEvent(IContainerEvent event) { - synchronized (listeners) { - for (Iterator i = listeners.iterator(); i.hasNext();) - ((ISharedObjectContainerListener) i.next()).handleEvent(event); - } - } - protected long getNextSequenceNumber() { - if (sequenceNumber == Long.MAX_VALUE) { - sequenceNumber = 0; - return sequenceNumber; - } else - return sequenceNumber++; - } - protected Object getGroupMembershipLock() { - return groupManager; - } - public ID[] getOtherMemberIDs() { - return groupManager.getOtherMemberIDs(); - } - protected ISharedObject getSharedObject(ID id) { - SOWrapper wrap = getSharedObjectWrapper(id); - if (wrap == null) - return null; - else - return wrap.getSharedObject(); - } - protected ISharedObject addSharedObjectAndWait(SharedObjectDescription sd, - ISharedObject s, ISharedObjectContainerTransaction t) - throws Exception { - if (sd.getID() == null || s == null) - return null; - ISharedObject so = addSharedObject0(sd, s); - // Wait right here until committed - if (t != null) - t.waitToCommit(); - return s; - } - protected ISharedObject addSharedObject0(SharedObjectDescription sd, - ISharedObject s) throws Exception { - addSharedObjectWrapper(makeNewSharedObjectWrapper(sd, s)); - return s; - } - protected SOWrapper makeNewSharedObjectWrapper(SharedObjectDescription sd, - ISharedObject s) { - SOConfig newConfig = makeNewSharedObjectConfig(sd, this); - return new SOWrapper(newConfig, s, this); - } - protected SOConfig makeNewSharedObjectConfig(SharedObjectDescription sd, - SOContainer cont) { - ID homeID = sd.getHomeID(); - if (homeID == null) - homeID = getID(); - return new SOConfig(sd.getID(), homeID, this, sd.getProperties()); - } - protected SOWrapper getSharedObjectWrapper(ID id) { - return groupManager.getFromActive(id); - } - protected void addSharedObjectWrapper(SOWrapper wrapper) throws Exception { - if (wrapper == null) - return; - ID id = wrapper.getObjID(); - synchronized (getGroupMembershipLock()) { - Object obj = groupManager.getFromAny(id); - if (obj != null) { - throw new SharedObjectAddException("SharedObject with id " - + id.getName() + " already in container"); - } - // Call initialize. If this throws it halts everything - wrapper.init(); - // Put in table - groupManager.addSharedObjectToActive(wrapper); - } - } - protected ISharedObject removeSharedObject(ID id) { - synchronized (getGroupMembershipLock()) { - SOWrapper wrap = groupManager.getFromActive(id); - if (wrap == null) - return null; - groupManager.removeSharedObject(id); - return wrap.getSharedObject(); - } - } - protected boolean addNewRemoteMember(ID memberID, Object data) { - return groupManager.addMember(new Member(memberID, data)); - } - abstract protected void queueContainerMessage(ContainerMessage mess) - throws IOException; - abstract protected void forwardToRemote(ID from, ID to, - ContainerMessage data) throws IOException; - abstract protected void forwardExcluding(ID from, ID excluding, - ContainerMessage data) throws IOException; - protected final void forward(ID fromID, ID toID, ContainerMessage data) - throws IOException { - if (toID == null) { - forwardExcluding(fromID, fromID, data); - } else { - forwardToRemote(fromID, toID, data); - } - } - protected boolean removeRemoteMember(ID remoteMember) { - return groupManager.removeMember(remoteMember); - } - protected void sendMessage(ContainerMessage data) throws IOException { - synchronized (getGroupMembershipLock()) { - ID ourID = getID(); - // We don't send to ourselves - if (!ourID.equals(data.getToContainerID())) - queueContainerMessage(data); - } - } - protected ID[] sendCreateSharedObjectMessage(ID toContainerID, - SharedObjectDescription sd) throws IOException { - ID[] returnIDs = null; - if (toContainerID == null) { - synchronized (getGroupMembershipLock()) { - // Send message to all - sendMessage(ContainerMessage.makeSharedObjectCreateMessage( - getID(), toContainerID, getNextSequenceNumber(), sd)); - returnIDs = getOtherMemberIDs(); - } - } else { - // If the create msg is directed to this space, no msg will be sent - if (getID().equals(toContainerID)) { - returnIDs = new ID[0]; - } else { - sendMessage(ContainerMessage.makeSharedObjectCreateMessage( - getID(), toContainerID, getNextSequenceNumber(), sd)); - returnIDs = new ID[1]; - returnIDs[0] = toContainerID; - } - } - return returnIDs; - } - protected void sendCreateResponseSharedObjectMessage(ID toContainerID, - ID fromSharedObject, Throwable t, long ident) throws IOException { - sendMessage(ContainerMessage.makeSharedObjectCreateResponseMessage( - getID(), toContainerID, getNextSequenceNumber(), - fromSharedObject, t, ident)); - } - protected void sendSharedObjectMessage(ID toContainerID, - ID fromSharedObject, Serializable data) throws IOException { - sendMessage(ContainerMessage.makeSharedObjectMessage(getID(), - toContainerID, getNextSequenceNumber(), fromSharedObject, data)); - } - protected void sendDisposeSharedObjectMessage(ID toContainerID, - ID fromSharedObject) throws IOException { - sendMessage(ContainerMessage.makeSharedObjectDisposeMessage(getID(), - toContainerID, getNextSequenceNumber(), fromSharedObject)); - } - public SOContainer(ISharedObjectContainerConfig config) { - if (config == null) - throw new InstantiationError("config must not be null"); - this.config = config; - groupManager = new SOContainerGMM(this, new Member(config.getID())); - sharedObjectManager = new SOManager(this); - loadingThreadGroup = getLoadingThreadGroup(); - sharedObjectThreadGroup = getSharedObjectThreadGroup(); - listeners = new Vector(); - receiver = new MessageReceiver(); - } - protected ISynchAsynchConnectionEventHandler getReceiver() { - return receiver; - } - protected boolean isClosing() { - return isClosing; - } - protected void setIsClosing() { - isClosing = true; - } - protected ThreadGroup getLoadingThreadGroup() { - return new ThreadGroup(getID() + ":Loading"); - } - protected ThreadGroup getSharedObjectThreadGroup() { - return new ThreadGroup(getID() + ":SOs"); - } - - public ID getID() { - return config.getID(); - } - - protected int getMaxGroupMembers() { - return groupManager.getMaxMembers(); - } - protected void setMaxGroupMembers(int max) { - groupManager.setMaxMembers(max); - } - - protected void notifySharedObjectActivated(ID sharedObjectID) { - groupManager.notifyOthersActivated(sharedObjectID); - } - protected void notifySharedObjectDeactivated(ID sharedObjectID) { - groupManager.notifyOthersDeactivated(sharedObjectID); - } - - protected boolean destroySharedObject(ID sharedObjectID) { - return groupManager.removeSharedObject(sharedObjectID); - } - - protected IOSGIService getOSGIServiceInterface() { - return null; - } - protected void sendCreate(ID sharedObjectID, ID toContainerID, - SharedObjectDescription sd) throws IOException { - sendCreateSharedObjectMessage(toContainerID, sd); - } - protected void sendDispose(ID toContainerID, ID sharedObjectID) - throws IOException { - sendDisposeSharedObjectMessage(toContainerID, sharedObjectID); - } - protected void sendMessage(ID toContainerID, ID sharedObjectID, - Object message) throws IOException { - if (message == null) - return; - if (message instanceof Serializable) - throw new NotSerializableException(message.getClass().getName()); - sendSharedObjectMessage(toContainerID, sharedObjectID, - (Serializable) message); - } - protected void sendCreateResponse(ID homeID, ID sharedObjectID, - Throwable t, long identifier) throws IOException { - sendCreateResponseSharedObjectMessage(homeID, sharedObjectID, t, - identifier); - } - protected Thread getNewSharedObjectThread(ID sharedObjectID, - Runnable runnable) { - return new Thread(sharedObjectThreadGroup, runnable, getID().getName() - + ";" + sharedObjectID.getName()); - } - protected ISharedObject load(SharedObjectDescription sd) throws Exception { - return sharedObjectManager.loadSharedObject(sd); - } - protected ID[] getSharedObjectIDs() { - return groupManager.getSharedObjectIDs(); - } - protected SOConfig makeSharedObjectConfig(SharedObjectDescription sd, - ISharedObject obj) { - return new SOConfig(sd.getID(), sd.getHomeID(), this, sd - .getProperties()); - } - protected void moveFromLoadingToActive(SOWrapper wrap) { - groupManager.moveSharedObjectFromLoadingToActive(wrap); - } - protected void removeFromLoading(ID id) { - groupManager.removeSharedObjectFromLoading(id); - } - - protected void processDisconnect(DisconnectConnectionEvent e) { - // XXX TODO - } - protected void processAsynch(AsynchConnectionEvent e) { - // XXX TODO - } - protected Serializable processSynch(SynchConnectionEvent e) - throws IOException { - // XXX TODO - return null; - } - class LoadingSharedObject implements ISharedObject { - - SharedObjectDescription description; - Thread runner = null; - - LoadingSharedObject(SharedObjectDescription sd) { - this.description = sd; - } - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig) - */ - public void init(ISharedObjectConfig initData) - throws SharedObjectInitException { - } - ID getID() { - return description.getID(); - } - - ID getHomeID() { - return description.getHomeID(); - } - - void start() { - if (runner == null) { - runner = (Thread) AccessController - .doPrivileged(new PrivilegedAction() { - public Object run() { - return getThread(); - } - }); - runner.setDaemon(true); - runner.start(); - } - - } - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event) - */ - public void handleEvent(Event event) { - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[]) - */ - public void handleEvents(Event[] events) { - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID) - */ - public void dispose(ID containerID) { - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) - */ - public Object getAdapter(Class clazz) { - return null; - } - Thread getThread() { - return new Thread(loadingThreadGroup, new Runnable() { - public void run() { - try { - if (Thread.currentThread().isInterrupted() - || isClosing()) - throw new InterruptedException( - "Loading interrupted for object " - + getID().getName()); - // First load given object - ISharedObject obj = load(description); - // Get config info for new object - SOConfig aConfig = makeSharedObjectConfig(description, - obj); - // Call init method on new object. - obj.init(aConfig); - // Check to make sure thread has not been - // interrupted...if it has, throw - if (Thread.currentThread().isInterrupted() - || isClosing()) - throw new InterruptedException( - "Loading interrupted for object " - + getID().getName()); - - // Create meta object and move from loading to active - // list. - SOContainer.this.moveFromLoadingToActive(new SOWrapper( - aConfig, obj, SOContainer.this)); - } catch (Exception e) { - SOContainer.this.removeFromLoading(getID()); - try { - sendCreateResponse(getHomeID(), getID(), e, - description.getIdentifier()); - } catch (Exception e1) { - } - } - } - }, "LRunner" + getID().getName()); - } - - } - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#getConfig() - */ - public ISharedObjectContainerConfig getConfig() { - return config; - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#addListener(org.eclipse.ecf.core.ISharedObjectContainerListener, - * java.lang.String) - */ - public void addListener(ISharedObjectContainerListener l, String filter) { - synchronized (listeners) { - listeners.add(l); - } - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#removeListener(org.eclipse.ecf.core.ISharedObjectContainerListener) - */ - public void removeListener(ISharedObjectContainerListener l) { - synchronized (listeners) { - listeners.remove(l); - } - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#dispose(long) - */ - public void dispose(long waittime) { - isClosing = true; - // XXX Notify listeners that we're going away - // Clear group manager - groupManager.removeAllMembers(); - groupManager = null; - // Clear shared object manager - sharedObjectManager.dispose(); - sharedObjectManager = null; - sharedObjectThreadGroup.interrupt(); - sharedObjectThreadGroup = null; - loadingThreadGroup.interrupt(); - loadingThreadGroup = null; - listeners.clear(); - listeners = null; - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#joinGroup(org.eclipse.ecf.core.identity.ID, - * java.lang.Object) - */ - public abstract void joinGroup(ID groupID, Object loginData) - throws SharedObjectContainerJoinException; - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#leaveGroup() - */ - public abstract void leaveGroup(); - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupID() - */ - public abstract ID getGroupID(); - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupMemberIDs() - */ - public ID[] getGroupMemberIDs() { - return groupManager.getMemberIDs(); - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupManager() - */ - public abstract boolean isGroupManager(); - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupServer() - */ - public abstract boolean isGroupServer(); - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#getSharedObjectManager() - */ - public ISharedObjectManager getSharedObjectManager() { - return sharedObjectManager; - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.core.ISharedObjectContainer#getAdapter(java.lang.Class) - */ - public Object getAdapter(Class adapter) { - return null; - } - - protected ClassLoader getClassLoaderForContainer() { - return this.getClass().getClassLoader(); - } - /** - * @param sd - * @return - */ - protected ClassLoader getClassLoaderForSharedObject( - SharedObjectDescription sd) { - if (sd != null) { - ClassLoader cl = sd.getClassLoader(); - if (cl != null) - return cl; - else - return getClassLoaderForContainer(); - } else - return getClassLoaderForContainer(); - } - /** - * @param sd - * @return - */ - public Object[] getArgsFromProperties(SharedObjectDescription sd) { - if (sd == null) - return null; - Map aMap = sd.getProperties(); - if (aMap == null) - return null; - Object obj = aMap.get(DEFAULT_OBJECT_ARG_KEY); - if (obj == null) - return null; - if (obj instanceof Object[]) { - Object[] ret = (Object[]) obj; - aMap.remove(DEFAULT_OBJECT_ARG_KEY); - return ret; - } else - return null; - } - /** - * @param sd - * @return - */ - public String[] getArgTypesFromProperties(SharedObjectDescription sd) { - if (sd == null) - return null; - Map aMap = sd.getProperties(); - if (aMap == null) - return null; - Object obj = aMap.get(DEFAULT_OBJECT_ARGTYPES_KEY); - if (obj == null) - return null; - if (obj instanceof String[]) { - String[] ret = (String[]) obj; - aMap.remove(DEFAULT_OBJECT_ARGTYPES_KEY); - return ret; - } else - return null; - } - - class MessageReceiver implements ISynchAsynchConnectionEventHandler { - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.internal.comm.ISynchConnectionEventHandler#handleSynchEvent(org.eclipse.ecf.internal.comm.SynchConnectionEvent) - */ - public Object handleSynchEvent(SynchConnectionEvent event) - throws IOException { - return processSynch(event); - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleSuspectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent) - */ - public boolean handleSuspectEvent(ConnectionEvent event) { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleDisconnectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent) - */ - public void handleDisconnectEvent(DisconnectConnectionEvent event) { - processDisconnect(event); - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#getAdapter(java.lang.Class) - */ - public Object getAdapter(Class clazz) { - return null; - } - - /* - * (non-Javadoc) - * - * @see org.eclipse.ecf.internal.comm.IAsynchConnectionEventHandler#handleAsynchEvent(org.eclipse.ecf.internal.comm.AsynchConnectionEvent) - */ - public void handleAsynchEvent(AsynchConnectionEvent event) - throws IOException { - processAsynch(event); - } - - } + static Trace debug = Trace.create("container"); + + public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class + .getName() + + ".sharedobjectargs"; + + public static final String DEFAULT_OBJECT_ARGTYPES_KEY = SOContainer.class + .getName() + + ".sharedobjectargs"; + + private long sequenceNumber = 0L; + + private Vector listeners = null; + + protected ISharedObjectContainerConfig config = null; + + protected SOContainerGMM groupManager = null; + + protected ThreadGroup loadingThreadGroup = null; + + protected ThreadGroup sharedObjectThreadGroup = null; + + protected SOManager sharedObjectManager = null; + + protected boolean isClosing = false; + + protected MessageReceiver receiver; + + protected void debug(String msg) { + if (Trace.ON && debug != null) { + debug.msg(msg + ":" + config.getID()); + } + } + + protected void dumpStack(String msg, Throwable e) { + if (Trace.ON && debug != null) { + debug.dumpStack(e, msg + ":" + config.getID()); + } + } + + protected byte[] getBytesForObject(Serializable obj) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + return bos.toByteArray(); + } + + protected ContainerMessage getObjectFromBytes(byte[] bytes) + throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bis); + Object obj = null; + try { + obj = ois.readObject(); + } catch (ClassNotFoundException e) { + // what to do? + dumpStack("class not found for message", e); + return null; + } + if (obj instanceof ContainerMessage) { + return (ContainerMessage) obj; + } else { + // what to do + return null; + } + } + + protected void killConnection(IConnection conn) { + debug("killconnection"); + try { + if (conn != null) + conn.disconnect(); + } catch (IOException e) { + // XXX log + } + } + + protected void memberLeave(ID target, IConnection conn) { + debug("memberLeave:" + target + ":" + conn); + if (conn != null) + killConnection(conn); + } + + protected void fireContainerEvent(IContainerEvent event) { + synchronized (listeners) { + for (Iterator i = listeners.iterator(); i.hasNext();) + ((ISharedObjectContainerListener) i.next()).handleEvent(event); + } + } + + protected long getNextSequenceNumber() { + if (sequenceNumber == Long.MAX_VALUE) { + sequenceNumber = 0; + return sequenceNumber; + } else + return sequenceNumber++; + } + + protected Object getGroupMembershipLock() { + return groupManager; + } + + public ID[] getOtherMemberIDs() { + return groupManager.getOtherMemberIDs(); + } + + protected ISharedObject getSharedObject(ID id) { + SOWrapper wrap = getSharedObjectWrapper(id); + if (wrap == null) + return null; + else + return wrap.getSharedObject(); + } + + protected ISharedObject addSharedObjectAndWait(SharedObjectDescription sd, + ISharedObject s, ISharedObjectContainerTransaction t) + throws Exception { + if (sd.getID() == null || s == null) + return null; + ISharedObject so = addSharedObject0(sd, s); + // Wait right here until committed + if (t != null) + t.waitToCommit(); + return s; + } + + protected ISharedObject addSharedObject0(SharedObjectDescription sd, + ISharedObject s) throws Exception { + addSharedObjectWrapper(makeNewSharedObjectWrapper(sd, s)); + return s; + } + + protected SOWrapper makeNewSharedObjectWrapper(SharedObjectDescription sd, + ISharedObject s) { + SOConfig newConfig = makeNewSharedObjectConfig(sd, this); + return new SOWrapper(newConfig, s, this); + } + + protected SOConfig makeNewSharedObjectConfig(SharedObjectDescription sd, + SOContainer cont) { + ID homeID = sd.getHomeID(); + if (homeID == null) + homeID = getID(); + return new SOConfig(sd.getID(), homeID, this, sd.getProperties()); + } + + protected SOWrapper getSharedObjectWrapper(ID id) { + return groupManager.getFromActive(id); + } + + protected void addSharedObjectWrapper(SOWrapper wrapper) throws Exception { + if (wrapper == null) + return; + ID id = wrapper.getObjID(); + synchronized (getGroupMembershipLock()) { + Object obj = groupManager.getFromAny(id); + if (obj != null) { + throw new SharedObjectAddException("SharedObject with id " + + id.getName() + " already in container"); + } + // Call initialize. If this throws it halts everything + wrapper.init(); + // Put in table + groupManager.addSharedObjectToActive(wrapper); + } + } + + protected ISharedObject removeSharedObject(ID id) { + synchronized (getGroupMembershipLock()) { + SOWrapper wrap = groupManager.getFromActive(id); + if (wrap == null) + return null; + groupManager.removeSharedObject(id); + return wrap.getSharedObject(); + } + } + + protected boolean addNewRemoteMember(ID memberID, Object data) { + debug("addNewRemoteMember:" + memberID); + return groupManager.addMember(new Member(memberID, data)); + } + + abstract protected void queueContainerMessage(ContainerMessage mess) + throws IOException; + + abstract protected void forwardToRemote(ID from, ID to, + ContainerMessage data) throws IOException; + + abstract protected void forwardExcluding(ID from, ID excluding, + ContainerMessage data) throws IOException; + + protected final void forward(ID fromID, ID toID, ContainerMessage data) + throws IOException { + if (toID == null) { + forwardExcluding(fromID, fromID, data); + } else { + forwardToRemote(fromID, toID, data); + } + } + + protected boolean removeRemoteMember(ID remoteMember) { + return groupManager.removeMember(remoteMember); + } + + protected void sendMessage(ContainerMessage data) throws IOException { + debug("sendcontainermessage:" + data); + synchronized (getGroupMembershipLock()) { + ID ourID = getID(); + // We don't send to ourselves + if (!ourID.equals(data.getToContainerID())) + queueContainerMessage(data); + } + } + + protected ID[] sendCreateSharedObjectMessage(ID toContainerID, + SharedObjectDescription sd) throws IOException { + ID[] returnIDs = null; + if (toContainerID == null) { + synchronized (getGroupMembershipLock()) { + // Send message to all + sendMessage(ContainerMessage.makeSharedObjectCreateMessage( + getID(), toContainerID, getNextSequenceNumber(), sd)); + returnIDs = getOtherMemberIDs(); + } + } else { + // If the create msg is directed to this space, no msg will be sent + if (getID().equals(toContainerID)) { + returnIDs = new ID[0]; + } else { + sendMessage(ContainerMessage.makeSharedObjectCreateMessage( + getID(), toContainerID, getNextSequenceNumber(), sd)); + returnIDs = new ID[1]; + returnIDs[0] = toContainerID; + } + } + return returnIDs; + } + + protected void sendCreateResponseSharedObjectMessage(ID toContainerID, + ID fromSharedObject, Throwable t, long ident) throws IOException { + sendMessage(ContainerMessage.makeSharedObjectCreateResponseMessage( + getID(), toContainerID, getNextSequenceNumber(), + fromSharedObject, t, ident)); + } + + protected void sendSharedObjectMessage(ID toContainerID, + ID fromSharedObject, Serializable data) throws IOException { + sendMessage(ContainerMessage.makeSharedObjectMessage(getID(), + toContainerID, getNextSequenceNumber(), fromSharedObject, data)); + } + + protected void sendDisposeSharedObjectMessage(ID toContainerID, + ID fromSharedObject) throws IOException { + sendMessage(ContainerMessage.makeSharedObjectDisposeMessage(getID(), + toContainerID, getNextSequenceNumber(), fromSharedObject)); + } + + public SOContainer(ISharedObjectContainerConfig config) { + if (config == null) + throw new InstantiationError("config must not be null"); + this.config = config; + groupManager = new SOContainerGMM(this, new Member(config.getID())); + sharedObjectManager = new SOManager(this); + loadingThreadGroup = getLoadingThreadGroup(); + sharedObjectThreadGroup = getSharedObjectThreadGroup(); + listeners = new Vector(); + receiver = new MessageReceiver(); + debug("<init>"); + } + + protected ISynchAsynchConnectionEventHandler getReceiver() { + return receiver; + } + + protected boolean isClosing() { + return isClosing; + } + + protected void setIsClosing() { + isClosing = true; + } + + protected ThreadGroup getLoadingThreadGroup() { + return new ThreadGroup(getID() + ":Loading"); + } + + protected ThreadGroup getSharedObjectThreadGroup() { + return new ThreadGroup(getID() + ":SOs"); + } + + public ID getID() { + return config.getID(); + } + + protected int getMaxGroupMembers() { + return groupManager.getMaxMembers(); + } + + protected void setMaxGroupMembers(int max) { + groupManager.setMaxMembers(max); + } + + protected void notifySharedObjectActivated(ID sharedObjectID) { + groupManager.notifyOthersActivated(sharedObjectID); + } + + protected void notifySharedObjectDeactivated(ID sharedObjectID) { + groupManager.notifyOthersDeactivated(sharedObjectID); + } + + protected boolean destroySharedObject(ID sharedObjectID) { + return groupManager.removeSharedObject(sharedObjectID); + } + + protected IOSGIService getOSGIServiceInterface() { + return null; + } + + protected void sendCreate(ID sharedObjectID, ID toContainerID, + SharedObjectDescription sd) throws IOException { + sendCreateSharedObjectMessage(toContainerID, sd); + } + + protected void sendDispose(ID toContainerID, ID sharedObjectID) + throws IOException { + sendDisposeSharedObjectMessage(toContainerID, sharedObjectID); + } + + protected void sendMessage(ID toContainerID, ID sharedObjectID, + Object message) throws IOException { + if (message == null) + return; + if (message instanceof Serializable) + throw new NotSerializableException(message.getClass().getName()); + sendSharedObjectMessage(toContainerID, sharedObjectID, + (Serializable) message); + } + + protected void sendCreateResponse(ID homeID, ID sharedObjectID, + Throwable t, long identifier) throws IOException { + sendCreateResponseSharedObjectMessage(homeID, sharedObjectID, t, + identifier); + } + + protected Thread getNewSharedObjectThread(ID sharedObjectID, + Runnable runnable) { + return new Thread(sharedObjectThreadGroup, runnable, getID().getName() + + ";" + sharedObjectID.getName()); + } + + protected ISharedObject load(SharedObjectDescription sd) throws Exception { + return sharedObjectManager.loadSharedObject(sd); + } + + protected ID[] getSharedObjectIDs() { + return groupManager.getSharedObjectIDs(); + } + + protected SOConfig makeSharedObjectConfig(SharedObjectDescription sd, + ISharedObject obj) { + return new SOConfig(sd.getID(), sd.getHomeID(), this, sd + .getProperties()); + } + + protected void moveFromLoadingToActive(SOWrapper wrap) { + groupManager.moveSharedObjectFromLoadingToActive(wrap); + } + + protected void removeFromLoading(ID id) { + groupManager.removeSharedObjectFromLoading(id); + } + + protected void processDisconnect(DisconnectConnectionEvent e) { + debug("processDisconnect:" + e); + try { + ContainerMessage mess = getObjectFromBytes((byte[]) e.getData()); + } catch (IOException except) { + + } + } + protected Object checkCreate(ID fromID, ID toID, long seq, SharedObjectDescription desc) { + // XXX TODO + return desc; + } + protected boolean addToLoading(LoadingSharedObject lso) { + return groupManager.addLoadingSharedObject(lso); + } + protected void handleCreateMessage(ContainerMessage mess) throws IOException { + debug("handleCreateMessage:"+mess); + + SharedObjectDescription desc = (SharedObjectDescription) mess.getData(); + ID fromID = mess.getFromContainerID(); + ID toID = mess.getToContainerID(); + long seq = mess.getSequence(); + Object result = checkCreate(fromID,toID,seq,desc); + if (result != null && (toID == null || toID.equals(getID()))) { + LoadingSharedObject lso = new LoadingSharedObject(desc,result); + synchronized (getGroupMembershipLock()) { + if (!addToLoading(lso)) { + ID sharedObjectID = desc.getID(); + try { + sendCreateResponse(fromID,sharedObjectID,new SharedObjectAddException("shared object "+sharedObjectID),desc.getIdentifier()); + } catch (IOException e) { + // XXX Log this + } + } + forward(fromID, toID, mess); + return; + } + } + synchronized (getGroupMembershipLock()) { + forward(fromID,toID,mess); + } + } + protected void handleCreateResponseMessage(ContainerMessage mess) throws IOException { + debug("handleCreateResponseMessage:"+mess); + ID fromID = mess.getFromContainerID(); + ID toID = mess.getToContainerID(); + long seq = mess.getSequence(); + ContainerMessage.CreateResponseMessage resp = (ContainerMessage.CreateResponseMessage) mess.getData(); + if (toID != null && toID.equals(getID())) { + SOWrapper sow = getSharedObjectWrapper(resp.getSharedObjectID()); + if (sow != null) { + sow.deliverCreateResponse(fromID,resp); + } else { + // XXX log here + } + } else { + forwardToRemote(fromID,toID,mess); + } + } + protected void handleSharedObjectDisposeMessage(ContainerMessage mess) throws IOException { + debug("handleSharedObjectDisposeMessage:"+mess); + ID fromID = mess.getFromContainerID(); + ID toID = mess.getToContainerID(); + long seq = mess.getSequence(); + ContainerMessage.SharedObjectDisposeMessage resp = (ContainerMessage.SharedObjectDisposeMessage) mess.getData(); + ID sharedObjectID = resp.getSharedObjectID(); + synchronized (getGroupMembershipLock()) { + if (groupManager.isLoading(sharedObjectID)) { + groupManager.removeSharedObjectFromLoading(sharedObjectID); + } else { + groupManager.removeSharedObject(sharedObjectID); + } + forward(fromID, toID, mess); + } + } + protected void handleSharedObjectMessage(ContainerMessage mess) throws IOException { + debug("handleSharedObjectMessage:"+mess); + ID fromID = mess.getFromContainerID(); + ID toID = mess.getToContainerID(); + long seq = mess.getSequence(); + ContainerMessage.SharedObjectMessage resp = (ContainerMessage.SharedObjectMessage) mess.getData(); + synchronized (getGroupMembershipLock()) { + if (toID == null || toID.equals(getID())) { + SOWrapper sow = getSharedObjectWrapper(resp.getFromSharedObjectID()); + if (sow != null) { + sow.deliverSharedObjectMessage(fromID,resp.getData()); + } + } + forward(fromID,toID,mess); + } + } + protected abstract void handleViewChangeMessage(ContainerMessage mess) throws IOException; + + protected void handleUnidentifiedMessage(ContainerMessage mess) throws IOException { + // do nothing + } + protected void handleAsynchIOException(IOException except, AsynchConnectionEvent e) { + // If we get IO Exception, we'll disconnect...if we can + killConnection(e.getConnection()); + } + protected void processAsynch(AsynchConnectionEvent e) { + debug("processAsynch:" + e); + try { + ContainerMessage mess = getObjectFromBytes((byte[]) e.getData()); + Serializable submess = mess.getData(); + if (submess != null) { + if (submess instanceof ContainerMessage.CreateMessage) { + handleCreateMessage(mess); + } else if (submess instanceof ContainerMessage.CreateResponseMessage) { + handleCreateResponseMessage(mess); + } else if (submess instanceof ContainerMessage.SharedObjectDisposeMessage) { + handleSharedObjectDisposeMessage(mess); + } else if (submess instanceof ContainerMessage.SharedObjectMessage) { + handleSharedObjectMessage(mess); + } else if (submess instanceof ContainerMessage.ViewChangeMessage) { + handleViewChangeMessage(mess); + } else { + handleUnidentifiedMessage(mess); + } + } else { + handleUnidentifiedMessage(mess); + } + } catch (IOException except) { + handleAsynchIOException(except,e); + } + } + + protected Serializable processSynch(SynchConnectionEvent e) + throws IOException { + debug("processSynch:" + e); + ContainerMessage mess = getObjectFromBytes((byte[]) e.getData()); + ID fromID = mess.getFromContainerID(); + notifyGroupLeave(mess); + synchronized (getGroupMembershipLock()) { + memberLeave(fromID, e.getConnection()); + } + return null; + } + + protected void notifyGroupLeave(ContainerMessage mess) { + // XXX todo + } + + class LoadingSharedObject implements ISharedObject { + + SharedObjectDescription description; + Object credentials; + + Thread runner = null; + + LoadingSharedObject(SharedObjectDescription sd, Object credentials) { + this.description = sd; + this.credentials = credentials; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig) + */ + public void init(ISharedObjectConfig initData) + throws SharedObjectInitException { + } + + ID getID() { + return description.getID(); + } + + ID getHomeID() { + return description.getHomeID(); + } + + void start() { + if (runner == null) { + runner = (Thread) AccessController + .doPrivileged(new PrivilegedAction() { + public Object run() { + return getThread(); + } + }); + runner.setDaemon(true); + runner.start(); + } + + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event) + */ + public void handleEvent(Event event) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[]) + */ + public void handleEvents(Event[] events) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID) + */ + public void dispose(ID containerID) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } + + Thread getThread() { + return new Thread(loadingThreadGroup, new Runnable() { + public void run() { + try { + if (Thread.currentThread().isInterrupted() + || isClosing()) + throw new InterruptedException( + "Loading interrupted for object " + + getID().getName()); + // First load given object + ISharedObject obj = load(description); + // Get config info for new object + SOConfig aConfig = makeSharedObjectConfig(description, + obj); + // Call init method on new object. + obj.init(aConfig); + // Check to make sure thread has not been + // interrupted...if it has, throw + if (Thread.currentThread().isInterrupted() + || isClosing()) + throw new InterruptedException( + "Loading interrupted for object " + + getID().getName()); + + // Create meta object and move from loading to active + // list. + SOContainer.this.moveFromLoadingToActive(new SOWrapper( + aConfig, obj, SOContainer.this)); + } catch (Exception e) { + SOContainer.this.removeFromLoading(getID()); + try { + sendCreateResponse(getHomeID(), getID(), e, + description.getIdentifier()); + } catch (Exception e1) { + } + } + } + }, "LRunner" + getID().getName()); + } + + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getConfig() + */ + public ISharedObjectContainerConfig getConfig() { + return config; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#addListener(org.eclipse.ecf.core.ISharedObjectContainerListener, + * java.lang.String) + */ + public void addListener(ISharedObjectContainerListener l, String filter) { + synchronized (listeners) { + listeners.add(l); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#removeListener(org.eclipse.ecf.core.ISharedObjectContainerListener) + */ + public void removeListener(ISharedObjectContainerListener l) { + synchronized (listeners) { + listeners.remove(l); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#dispose(long) + */ + public void dispose(long waittime) { + debug("dispose:" + waittime); + isClosing = true; + // XXX Notify listeners that we're going away + // Clear group manager + if (groupManager != null) { + groupManager.removeAllMembers(); + groupManager = null; + } + // Clear shared object manager + if (sharedObjectManager != null) { + sharedObjectManager.dispose(); + sharedObjectManager = null; + } + if (sharedObjectThreadGroup != null) { + sharedObjectThreadGroup.interrupt(); + sharedObjectThreadGroup = null; + } + if (loadingThreadGroup != null) { + loadingThreadGroup.interrupt(); + loadingThreadGroup = null; + } + if (listeners != null) { + listeners.clear(); + listeners = null; + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#joinGroup(org.eclipse.ecf.core.identity.ID, + * java.lang.Object) + */ + public abstract void joinGroup(ID groupID, Object loginData) + throws SharedObjectContainerJoinException; + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#leaveGroup() + */ + public abstract void leaveGroup(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupID() + */ + public abstract ID getGroupID(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getGroupMemberIDs() + */ + public ID[] getGroupMemberIDs() { + return groupManager.getMemberIDs(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupManager() + */ + public abstract boolean isGroupManager(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#isGroupServer() + */ + public abstract boolean isGroupServer(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getSharedObjectManager() + */ + public ISharedObjectManager getSharedObjectManager() { + return sharedObjectManager; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObjectContainer#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class adapter) { + return null; + } + + protected ClassLoader getClassLoaderForContainer() { + return this.getClass().getClassLoader(); + } + + /** + * @param sd + * @return + */ + protected ClassLoader getClassLoaderForSharedObject( + SharedObjectDescription sd) { + if (sd != null) { + ClassLoader cl = sd.getClassLoader(); + if (cl != null) + return cl; + else + return getClassLoaderForContainer(); + } else + return getClassLoaderForContainer(); + } + + /** + * @param sd + * @return + */ + public Object[] getArgsFromProperties(SharedObjectDescription sd) { + if (sd == null) + return null; + Map aMap = sd.getProperties(); + if (aMap == null) + return null; + Object obj = aMap.get(DEFAULT_OBJECT_ARG_KEY); + if (obj == null) + return null; + if (obj instanceof Object[]) { + Object[] ret = (Object[]) obj; + aMap.remove(DEFAULT_OBJECT_ARG_KEY); + return ret; + } else + return null; + } + + /** + * @param sd + * @return + */ + public String[] getArgTypesFromProperties(SharedObjectDescription sd) { + if (sd == null) + return null; + Map aMap = sd.getProperties(); + if (aMap == null) + return null; + Object obj = aMap.get(DEFAULT_OBJECT_ARGTYPES_KEY); + if (obj == null) + return null; + if (obj instanceof String[]) { + String[] ret = (String[]) obj; + aMap.remove(DEFAULT_OBJECT_ARGTYPES_KEY); + return ret; + } else + return null; + } + + class MessageReceiver implements ISynchAsynchConnectionEventHandler { + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.ISynchConnectionEventHandler#handleSynchEvent(org.eclipse.ecf.internal.comm.SynchConnectionEvent) + */ + public Object handleSynchEvent(SynchConnectionEvent event) + throws IOException { + return processSynch(event); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleSuspectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent) + */ + public boolean handleSuspectEvent(ConnectionEvent event) { + return false; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#handleDisconnectEvent(org.eclipse.ecf.internal.comm.ConnectionEvent) + */ + public void handleDisconnectEvent(DisconnectConnectionEvent event) { + processDisconnect(event); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IConnectionEventHandler#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.internal.comm.IAsynchConnectionEventHandler#handleAsynchEvent(org.eclipse.ecf.internal.comm.AsynchConnectionEvent) + */ + public void handleAsynchEvent(AsynchConnectionEvent event) + throws IOException { + processAsynch(event); + } + + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java index e1982adf3..720d1d952 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java @@ -6,19 +6,20 @@ import java.security.PrivilegedAction; import org.eclipse.ecf.core.ISharedObject; import org.eclipse.ecf.core.SharedObjectInitException; +import org.eclipse.ecf.core.events.RemoteSharedObjectCreateResponseEvent; +import org.eclipse.ecf.core.events.RemoteSharedObjectEvent; import org.eclipse.ecf.core.events.SharedObjectActivatedEvent; import org.eclipse.ecf.core.events.SharedObjectContainerDepartedEvent; import org.eclipse.ecf.core.events.SharedObjectContainerJoinedEvent; import org.eclipse.ecf.core.events.SharedObjectDeactivatedEvent; import org.eclipse.ecf.core.identity.ID; -import org.eclipse.ecf.core.util.AsynchResult; import org.eclipse.ecf.core.util.Event; import org.eclipse.ecf.core.util.SimpleQueueImpl; import org.eclipse.ecf.provider.Trace; import org.eclipse.ecf.provider.generic.gmm.Member; final class SOWrapper { - static Trace debug = Trace.create(SOWrapper.class.getName()); + static Trace debug = Trace.create("sharedobjectwrapper"); protected ISharedObject sharedObject; private SOConfig sharedObjectConfig; @@ -51,6 +52,7 @@ final class SOWrapper { } void init() throws SharedObjectInitException { + debug("init()"); sharedObject.init(sharedObjectConfig); } ID getObjID() { @@ -62,6 +64,7 @@ final class SOWrapper { } void activated(ID[] ids) { + debug("activated"); // First, make space reference accessible to use by RepObject sharedObjectConfig.makeActive(new QueueEnqueueImpl(queue)); thread = (Thread) AccessController.doPrivileged(new PrivilegedAction() { @@ -77,6 +80,7 @@ final class SOWrapper { } void deactivated() { + debug("deactivated()"); send(new SharedObjectDeactivatedEvent(containerID, sharedObjectID)); container.notifySharedObjectDeactivated(sharedObjectID); destroyed(); @@ -94,6 +98,7 @@ final class SOWrapper { } void otherChanged(ID otherID, boolean activated) { + debug("otherChanged("+otherID+","+activated); if (activated && thread != null) { send(new SharedObjectActivatedEvent(containerID, otherID, null)); } else { @@ -101,6 +106,7 @@ final class SOWrapper { } } void memberChanged(Member m, boolean add) { + debug("memberChanged("+m+","+add); if (thread != null) { if (add) { send(new SharedObjectContainerJoinedEvent(containerID, m @@ -116,19 +122,9 @@ final class SOWrapper { return container.getNewSharedObjectThread(sharedObjectID, new Runnable() { public void run() { - if (Trace.ON && debug != null) { - debug.msg("Starting runner for " + sharedObjectID); - } - // The debug class will associate this thread with - // container - Trace.setThreadDebugGroup(container.getID()); - // Then process messages on queue until interrupted or - // queue closed - //Msg aMsg = null; + debug("runner("+sharedObjectID+")"); Event evt = null; for (;;) { - // make sure the thread hasn't been interrupted and - // get Msg from SimpleQueueImpl if (Thread.currentThread().isInterrupted()) break; @@ -145,28 +141,15 @@ final class SOWrapper { SOWrapper.this.doDestroy(); } } catch (Throwable t) { - if (Trace.ON && debug != null) { - debug.dumpStack(t, - "Exception executing event " + evt - + " on meta " + this); - } handleRuntimeException(t); } } // If the thread was interrupted, then show appropriate // spam if (Thread.currentThread().isInterrupted()) { - if (Trace.ON && debug != null) { - debug - .msg("Runner for " - + sharedObjectID - + " terminating after being interrupted"); - } + debug("runner("+sharedObjectID+") terminating interrupted"); } else { - if (Trace.ON && debug != null) { - debug.msg("Runner for " + sharedObjectID - + " terminating normally"); - } + debug("runner("+sharedObjectID+") terminating normally"); } } }); @@ -195,31 +178,11 @@ final class SOWrapper { sharedObject.dispose(containerID); } - //void createMsgResp(ID fromID, ContainerMessage.CreateResponse resp) { - /* - * if (sharedObjectConfig.getMsgMask().get(MsgMask.CREATERESPONSE) && thread != - * null) { send( Msg.makeMsg( null, CREATE_RESP_RCVD, fromID, resp.myExcept, - * new Long(resp.mySeq))); } - */ - //} - void deliverObjectFromRemote(ID fromID, Serializable data) { - // If we have a container, forward message onto container - /* - * if (myContainerID != null) { forwardToContainer( Msg.makeMsg(null, - * REMOTE_REPOBJ_MSG, fromID, data)); // otherwise, send to our object - * (assuming it has thread and that it wants to receive message) } else - * if ( sharedObjectConfig.getMsgMask().get(MsgMask.REMOTEDATA) && - * thread != null) { send(Msg.makeMsg(null, REMOTE_REPOBJ_MSG, fromID, - * data)); } - */ + void deliverSharedObjectMessage(ID fromID, Serializable data) { + send(new RemoteSharedObjectEvent(getObjID(),fromID,data)); } - - void forwardToContainer(Event msg) { - /* - * try { container.deliverForwardToRepObject(sharedObjectID, - * myContainerID, msg); } catch (Exception e) { - * handleRuntimeException(e); } - */ + void deliverCreateResponse(ID fromID, ContainerMessage.CreateResponseMessage resp) { + send(new RemoteSharedObjectCreateResponseEvent(resp.getSharedObjectID(),fromID,resp.getSequence(),resp.getException())); } void deliverEventFromSharedObject(ID fromID, Event evt) { /* @@ -230,16 +193,6 @@ final class SOWrapper { * null) { send(Msg.makeMsg(null, REPOBJ_MSG, fromID, msg)); } */ } - void deliverRequestFromRepObject(ID fromID, Event evt, AsynchResult future) { - /* - * if (myContainerID != null) { forwardToContainer( Msg.makeMsg(null, - * REPOBJ_REQ, fromID, msg, future)); } else if ( - * sharedObjectConfig.getMsgMask().get(MsgMask.REPOBJMSG) && thread != - * null) { // Check to see that messages may be received...determined by - * the REPOBJMSG // bit in msg mask send(Msg.makeMsg(null, REPOBJ_REQ, - * fromID, msg, future)); } - */ - } void deliverForwardedMsg(ID fromID, Event evt) { /* * if (myContainerID != null) { forwardToContainer(Msg.makeMsg(null, @@ -267,16 +220,18 @@ final class SOWrapper { sb.append("SharedObjectWrapper[").append(getObjID()).append("]"); return sb.toString(); } - void handleRuntimeException(Throwable except) { + protected void debug(String msg) { if (Trace.ON && debug != null) { - debug.dumpStack(except, "handleRuntimeException called for " - + sharedObjectID); - } - try { - Trace.errDumpStack(except, "handleRuntimeException called for " - + sharedObjectID); - } catch (Throwable e) { - } + debug.msg(msg); + } + } + protected void dumpStack(String msg, Throwable e) { + if (Trace.ON && debug != null) { + debug.dumpStack(e,msg); + } + } + void handleRuntimeException(Throwable except) { + dumpStack("runner:unhandledexception("+sharedObjectID.getName()+")",except); } /** * @return diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java index 0c3e9c5d0..14b17c3e9 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java @@ -71,10 +71,9 @@ public class ServerSOContainer extends SOContainer { } } } - protected void handleChangeMsg(ID fromID, ID toID, long seqNum, - Serializable data) throws IOException { + protected void handleViewChangeMessage(ContainerMessage mess) throws IOException { // Server should never receive change messages - } + } public void joinGroup(ID group, Object data) throws SharedObjectContainerJoinException { diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java index 2acc08558..607a157ac 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPClientSOContainer.java @@ -24,6 +24,7 @@ public class TCPClientSOContainer extends ClientSOContainer { protected ISynchAsynchConnection getClientConnection(ID remoteSpace, Object data) throws ConnectionInstantiationException { + debug("getClientConnection:"+remoteSpace+":"+data); Object[] args = { new Integer(keepAlive) }; ISynchAsynchConnection conn = null; conn = ConnectionFactory.makeSynchAsynchConnection(receiver, DEFAULT_COMM_NAME, args); diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java index e1a9df9ad..7387c21f0 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/TCPServerSOContainerGroup.java @@ -25,7 +25,7 @@ public class TCPServerSOContainerGroup extends SOContainerGroup implements public static final String INVALID_CONNECT = "Invalid connect request. "; public static final Trace debug = Trace - .create(TCPServerSOContainerGroup.class.getName()); + .create("connection"); public static final String DEFAULT_GROUP_NAME = TCPServerSOContainerGroup.class .getName(); @@ -48,10 +48,18 @@ public class TCPServerSOContainerGroup extends SOContainerGroup implements this(DEFAULT_GROUP_NAME, null, port); } - public synchronized void putOnTheAir() throws IOException { + protected void debug(String msg) { if (Trace.ON && debug != null) { - debug.msg("Putting group " + this + " on the air."); - } + debug.msg(msg); + } + } + protected void dumpStack(String msg, Throwable e) { + if (Trace.ON && debug != null) { + debug.dumpStack(e,msg); + } + } + public synchronized void putOnTheAir() throws IOException { + debug("group at port "+port+" on the air"); listener = new Server(threadGroup, port, this); port = listener.getLocalPort(); isOnTheAir = true; @@ -72,7 +80,7 @@ public class TCPServerSOContainerGroup extends SOContainerGroup implements ConnectRequestMessage req = (ConnectRequestMessage) iStream .readObject(); if (Trace.ON && debug != null) { - debug.msg("Got connect request " + req); + debug.msg("serverrecv:" + req); } if (req == null) throw new InvalidObjectException(INVALID_CONNECT @@ -88,10 +96,7 @@ public class TCPServerSOContainerGroup extends SOContainerGroup implements + "Target path is null"); TCPServerSOContainer srs = (TCPServerSOContainer) get(path); - if (Trace.ON && debug != null) { - debug.msg("Found container with " + srs.getID().getName() - + " for target " + uri); - } + debug("found container:"+srs.getID().getName()+" for target "+uri); if (srs == null) throw new InvalidObjectException("Container for target " + path + " not found!"); diff --git a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/events/ISharedObjectCreateResponseEvent.java b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/events/ISharedObjectCreateResponseEvent.java new file mode 100644 index 000000000..31fdfdcd6 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/events/ISharedObjectCreateResponseEvent.java @@ -0,0 +1,15 @@ +/******************************************************************************* + * Copyright (c) 2004 Composent, Inc. and others. All rights reserved. This + * program and the accompanying materials are made available under the terms of + * the Eclipse Public License v1.0 which accompanies this distribution, and is + * available at http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: Composent, Inc. - initial API and implementation + ******************************************************************************/ + +package org.eclipse.ecf.core.events; + +public interface ISharedObjectCreateResponseEvent extends ISharedObjectMessageEvent { + public long getSequence(); + public Throwable getException(); +} diff --git a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/events/RemoteSharedObjectCreateResponseEvent.java b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/events/RemoteSharedObjectCreateResponseEvent.java new file mode 100644 index 000000000..7de1c8a21 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/events/RemoteSharedObjectCreateResponseEvent.java @@ -0,0 +1,28 @@ +package org.eclipse.ecf.core.events; + +import org.eclipse.ecf.core.identity.ID; + +public class RemoteSharedObjectCreateResponseEvent extends RemoteSharedObjectEvent + implements ISharedObjectCreateResponseEvent { + + long sequence = 0; + + public RemoteSharedObjectCreateResponseEvent(ID senderObj, ID remoteCont, long seq, Throwable exception) { + super(senderObj,remoteCont,exception); + this.sequence = seq; + } + /* (non-Javadoc) + * @see org.eclipse.ecf.core.events.ISharedObjectCreateResponseEvent#getSequence() + */ + public long getSequence() { + return sequence; + } + + /* (non-Javadoc) + * @see org.eclipse.ecf.core.events.ISharedObjectCreateResponseEvent#getException() + */ + public Throwable getException() { + return (Throwable) getData(); + } + +} |
