diff options
| author | slewis | 2005-06-26 22:26:55 +0000 |
|---|---|---|
| committer | slewis | 2005-06-26 22:26:55 +0000 |
| commit | 499967802ecea039cd8156166fbe1e8fa583a057 (patch) | |
| tree | a808feb9bf577ae70f84d98e39f240be46122e2a | |
| parent | d094649c4a71ac553f00b11102c0e11027e7bca6 (diff) | |
| download | org.eclipse.ecf-499967802ecea039cd8156166fbe1e8fa583a057.tar.gz org.eclipse.ecf-499967802ecea039cd8156166fbe1e8fa583a057.tar.xz org.eclipse.ecf-499967802ecea039cd8156166fbe1e8fa583a057.zip | |
Added code to support IConnectionEventHandler.getEventHandlerID. Fixed tracing and increased tracing support in Client comm/tcp code.
4 files changed, 102 insertions, 65 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/.options b/framework/bundles/org.eclipse.ecf.provider/.options index 19965ce51..5b80749fb 100644 --- a/framework/bundles/org.eclipse.ecf.provider/.options +++ b/framework/bundles/org.eclipse.ecf.provider/.options @@ -2,7 +2,6 @@ org.eclipse.ecf.provider/debug = true org.eclipse.ecf.provider/debug/filter = * org.eclipse.ecf.provider/debug/flag = true org.eclipse.ecf.provider/debug/connection = true -org.eclipse.ecf.provider/debug/connectionping = false org.eclipse.ecf.provider/debug/container = true org.eclipse.ecf.provider/debug/sharedobjectwrapper = true org.eclipse.ecf.provider/debug/sharedobjectcontext = true diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java index 8a228c7d6..2f77df1c4 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java @@ -18,7 +18,7 @@ import org.eclipse.core.runtime.Platform; public class Trace { public static final String tracePrefix = "(trace)"; - public static boolean ON = false; + public static boolean ON = true; protected static boolean isEclipse = false; protected static String pluginName = ""; protected static String debugPrefix = "/debug/"; diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java index 08a715513..44161cc76 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java @@ -18,6 +18,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.net.ConnectException; import java.net.Socket; +import java.net.SocketException; import java.net.URI; import java.net.URISyntaxException; import java.security.AccessController; @@ -25,6 +26,7 @@ import java.security.PrivilegedAction; import java.util.Enumeration; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Vector; import org.eclipse.ecf.core.comm.AsynchConnectionEvent; import org.eclipse.ecf.core.comm.ConnectionDescription; @@ -72,17 +74,19 @@ public final class Client implements ISynchAsynchConnection { } public static final String PROTOCOL = "ecftcp"; - public static final Trace debug = Trace.create("connection"); + protected static final Trace trace = Trace.create("connection"); public static final int SNDR_PRIORITY = Thread.NORM_PRIORITY; public static final int RCVR_PRIORITY = Thread.NORM_PRIORITY; // Default close timeout is 1.5 seconds - public static final long CLOSE_TIMEOUT = 1500; + public static final long CLOSE_TIMEOUT = 2000; public static final int DEF_MAX_MSG = 50; - protected String address; - protected int port; + protected Socket socket; + private String addressPort = "-1:<no endpoint>:-1"; + protected ObjectOutputStream outputStream; protected ObjectInputStream inputStream; + protected ISynchAsynchConnectionEventHandler handler; protected SimpleQueueImpl queue = new SimpleQueueImpl(); protected int keepAlive = 0; @@ -98,22 +102,27 @@ public final class Client implements ISynchAsynchConnection { protected long closeTimeout = CLOSE_TIMEOUT; protected Vector eventNotify = null; protected Map properties; - + + protected ID containerID = null; + public Map getProperties() { return properties; } public Object getAdapter(Class clazz) { return null; } - protected void debug(String msg) { - if (Trace.ON && debug != null) { - debug.msg(msg); + private String getAddressPort() { + return addressPort; + } + protected void trace(String msg) { + if (Trace.ON && trace != null) { + trace.msg(getAddressPort()+";"+msg); } } protected void dumpStack(String msg, Throwable e) { - if (Trace.ON && debug != null) { - debug.dumpStack(e, msg); + if (Trace.ON && trace != null) { + trace.dumpStack(e, getAddressPort()+";"+msg); } } @@ -127,19 +136,25 @@ public final class Client implements ISynchAsynchConnection { throws IOException { this(aSocket, iStream, oStream, handler, keepAlive, DEF_MAX_MSG); } - + private void setSocket(Socket s) throws SocketException { + socket = s; + if (s != null) { + // Set socket options + s.setTcpNoDelay(true); + addressPort = s.getLocalPort()+":"+s.getInetAddress().getHostName()+":"+s.getPort(); + } else { + addressPort = "-1:<no endpoint>:-1"; + } + } public Client(Socket aSocket, ObjectInputStream iStream, ObjectOutputStream oStream, ISynchAsynchConnectionEventHandler handler, int keepAlive, int maxmsgs) throws IOException { - socket = aSocket; - // Set TCP no delay - socket.setTcpNoDelay(true); - address = socket.getInetAddress().getHostName(); - port = socket.getPort(); + setSocket(socket); inputStream = iStream; outputStream = oStream; this.handler = handler; + containerID = handler.getEventHandlerID(); this.keepAlive = keepAlive; maxMsg = maxmsgs; properties = new Properties(); @@ -157,18 +172,20 @@ public final class Client implements ISynchAsynchConnection { public Client(ISynchAsynchConnectionEventHandler handler, int keepAlive, int maxmsgs) { this.handler = handler; + containerID = handler.getEventHandlerID(); this.keepAlive = keepAlive; maxMsg = maxmsgs; this.properties = new Properties(); } public synchronized ID getLocalID() { + if (containerID != null) return containerID; if (socket == null) return null; ID retID = null; try { retID = IDFactory.makeStringID(PROTOCOL + "://" - + socket.getLocalAddress().getHostName() + ":" + port); + + socket.getLocalAddress().getHostName() + ":" + socket.getLocalPort()); } catch (Exception e) { return null; } @@ -216,48 +233,46 @@ public final class Client implements ISynchAsynchConnection { public synchronized Object connect(ID remote, Object data, int timeout) throws IOException { - debug("connect(" + remote + "," + data + "," + timeout + ")"); + trace("connect(" + remote + "," + data + "," + timeout + ")"); if (socket != null) - throw new ConnectException("ClientApplication already connected"); + throw new ConnectException("Already connected to "+getAddressPort()); + // parse URI URI anURI = null; try { anURI = remote.toURI(); } catch (URISyntaxException e) { - throw new IOException("Can't connect to address " - + remote.getName() + ". Invalid URL"); + throw new IOException("Invalid URL"); } - address = anURI.getHost(); - port = anURI.getPort(); + // Get socket factory and create/connect socket SocketFactory fact = SocketFactory.getSocketFactory(); if (fact == null) { fact = SocketFactory.getDefaultSocketFactory(); } - debug("socket connecting to " + address + ":" + port); - // Actually connect to remote using socket from socket factory. - socket = fact.createSocket(address, port, timeout); - // Set TCP no delay - socket.setTcpNoDelay(true); - boolean compatibility = false; - //boolean compatibility = TCPCompatibility.useCompatibility; - outputStream = new ExObjectOutputStream(new BufferedOutputStream(socket - .getOutputStream()), compatibility); + Socket s = fact.createSocket(anURI.getHost(), anURI.getPort(), timeout); + // Now we've got a connection so set our socket + setSocket(s); + outputStream = new ExObjectOutputStream(new BufferedOutputStream(s + .getOutputStream()), false); outputStream.flush(); - inputStream = new ExObjectInputStream(socket.getInputStream(), - compatibility); - // send connect data - debug("send:" + address + ":" + port + ":" + data); + inputStream = new ExObjectInputStream(s.getInputStream(), + false); + trace("connect;" + anURI); + // send connect data and get syncronous response sendIt(new ConnectRequestMessage(anURI, (Serializable) data)); ConnectResultMessage res = null; res = (ConnectResultMessage) readObject(); - debug("recv:" + address + ":" + port + ":" + res); + trace("connect;rcv:"+ res); // Setup threads setupThreads(); // Return results. - return res.getData(); + Object ret = res.getData(); + trace("connect;returning:"+ret); + return ret; } private void setupThreads() { // Setup threads + trace("setupThreads()"); sendThread = (Thread) AccessController .doPrivileged(new PrivilegedAction() { public Object run() { @@ -286,7 +301,7 @@ public final class Client implements ISynchAsynchConnection { break; try { // Actually send message - debug("send:" + address + ":" + port + ":" + aMsg); + trace("send:" + aMsg); sendIt(aMsg); // Successful...remove message from queue queue.removeHead(); @@ -299,11 +314,13 @@ public final class Client implements ISynchAsynchConnection { msgCount++; } catch (IOException e) { if (isClosing) { - isClosing = false; + //isClosing = false; + dumpStack("SENDER CLOSING",e); synchronized (Client.this) { Client.this.notifyAll(); } } else { + dumpStack("SENDER EXCEPTION",e); if (!handler.handleSuspectEvent(new ConnectionEvent( Client.this, e))) { handler @@ -314,9 +331,9 @@ public final class Client implements ISynchAsynchConnection { break; } } - debug("sender:" + address + ":" + port + " terminating"); + trace("SENDER TERMINATING"); } - }, "sndr:" + address + ":" + port); + }, getLocalID() + ":sndr:" + getAddressPort()); // Set priority for new thread aThread.setPriority(SNDR_PRIORITY); return aThread; @@ -324,25 +341,29 @@ public final class Client implements ISynchAsynchConnection { private void closeSocket() { try { - if (socket != null) + if (socket != null) { socket.close(); + setSocket(null); + } } catch (IOException e) { + dumpStack("socket close",e); } } private void sendIt(Serializable snd) throws IOException { // Write object to output stream + trace("sendIt("+snd+")"); synchronized (outputStream) { outputStream.writeObject(snd); outputStream.flush(); - nextPingTime = System.currentTimeMillis() + keepAlive; + nextPingTime = System.currentTimeMillis() + (keepAlive/2); } } private void receiveResp() { synchronized (outputStream) { waitForPing = false; - nextPingTime = System.currentTimeMillis() + keepAlive; + nextPingTime = System.currentTimeMillis() + (keepAlive/2); outputStream.notifyAll(); } } @@ -353,17 +374,15 @@ public final class Client implements ISynchAsynchConnection { private void sendClose(Serializable snd) throws IOException { isClosing = true; - debug("send:" + address + ":" + port + ":" + snd); + trace("sendClose(" + snd + ")"); sendIt(snd); if (isClosing) { try { wait(closeTimeout); - } catch (Exception e) { + } catch (InterruptedException e) { + dumpStack("sendClose wait",e); } } - // Before returning, actually remove remote objects - //handler.handleDisconnectEvent(new DisconnectConnectionEvent( - //ClientApplication.this, null, queue)); } private Thread getRcvThread() { @@ -378,11 +397,12 @@ public final class Client implements ISynchAsynchConnection { handleRcv(readObject()); } catch (IOException e) { if (isClosing) { - isClosing = false; + dumpStack("RCVR CLOSING",e); synchronized (Client.this) { Client.this.notifyAll(); } } else { + dumpStack("RCVR EXCEPTION",e); if (!handler.handleSuspectEvent(new ConnectionEvent( Client.this, e))) { handler @@ -393,9 +413,9 @@ public final class Client implements ISynchAsynchConnection { break; } } - debug("read:" + address + ":" + port + " terminating"); + trace("RCVR TERMINATING"); } - }, "rcvr:" + address + ":" + port); + }, getLocalID() + ":rcvr:" + getAddressPort()); // Set priority and return aThread.setPriority(RCVR_PRIORITY); return aThread; @@ -409,16 +429,17 @@ public final class Client implements ISynchAsynchConnection { if (rcv instanceof SynchMessage) { // Handle synch message. The only valid synch message is // 'close'. - debug("recv:" + address + ":" + port + ":" + rcv); + trace("recv:" + rcv); handler.handleSynchEvent(new SynchConnectionEvent(this, ((SynchMessage) rcv).getData())); } else if (rcv instanceof AsynchMessage) { - debug("recv:" + address + ":" + port + ":" + rcv); + trace("recv:" + rcv); Serializable d = ((AsynchMessage) rcv).getData(); // Handle asynch messages. handler.handleAsynchEvent(new AsynchConnectionEvent(this, d)); } else if (rcv instanceof PingMessage) { // Handle ping by sending response back immediately + trace("recv:" + rcv); sendIt(pingResp); } else if (rcv instanceof PingResponseMessage) { // Do nothing with ping response @@ -429,9 +450,8 @@ public final class Client implements ISynchAsynchConnection { throw e; } } - public synchronized void start() { - debug("start(" + address + ":" + port + ")"); + trace("start()"); if (sendThread != null) sendThread.start(); if (rcvThread != null) @@ -444,12 +464,21 @@ public final class Client implements ISynchAsynchConnection { } public void stop() { + trace("stop()"); } private Thread setupPing() { + trace("setupPing()"); + final int pingStartWait = (new Random()).nextInt(keepAlive / 2); return new Thread(new Runnable() { public void run() { Thread me = Thread.currentThread(); + // Sleep a random interval to start + try { + Thread.sleep(pingStartWait); + } catch (InterruptedException e) { + return; + } while (!queue.isStopped()) { try { if (me.isInterrupted()) @@ -476,18 +505,18 @@ public final class Client implements ISynchAsynchConnection { } } if (waitForPing) { - throw new IOException(address + ":" + port - + " not reachable."); + throw new IOException(getAddressPort()+ " not reachable"); } } } } catch (Exception e) { if (isClosing) { - isClosing = false; + dumpStack("PING CLOSING",e); synchronized (Client.this) { Client.this.notifyAll(); } } else { + dumpStack("PING EXCEPTION",e); if (!handler.handleSuspectEvent(new ConnectionEvent( Client.this, e))) { handler @@ -498,12 +527,13 @@ public final class Client implements ISynchAsynchConnection { break; } } + trace("PING TERMINATING"); } - }, "ping:" + address + ":" + port); + }, getLocalID()+":ping:"+getAddressPort()); } public synchronized void disconnect() throws IOException { - debug("disconnect(" + address + ":" + port + ")"); + trace("disconnect()"); // Close send queue and socket queue.close(); closeSocket(); @@ -537,6 +567,7 @@ public final class Client implements ISynchAsynchConnection { throws IOException { if (queue.isStopped() || isClosing) throw new ConnectException("Not connected"); + trace("queueObject("+recipient+","+obj+")"); queue.enqueue(new AsynchMessage(obj)); } @@ -544,15 +575,18 @@ public final class Client implements ISynchAsynchConnection { throws IOException { if (queue.isStopped() || isClosing) throw new ConnectException("Not connected"); + trace("queueObject("+recipient+","+obj+")"); sendClose(new SynchMessage(obj)); return null; } public Object sendSynch(ID rec, Object obj) throws IOException { + trace("sendSynch("+rec+","+obj+")"); return sendObject(rec, (Serializable) obj); } public Object sendSynch(ID rec, byte[] obj) throws IOException { + trace("sendSynch("+rec+","+obj+")"); return sendObject(rec, obj); } @@ -561,7 +595,7 @@ public final class Client implements ISynchAsynchConnection { try { ret = (Serializable) inputStream.readObject(); } catch (ClassNotFoundException e) { - dumpStack("ClassNotFoundException", e); + dumpStack("readObject;classnotfoundexception", e); throw new IOException( "Protocol violation due to class load failure. " + e.getMessage()); diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java index 7c5923d23..9dd9f5896 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java @@ -226,6 +226,10 @@ public abstract class SOContainer implements ISharedObjectContainer { throws IOException { return processSynch(event); } + + public ID getEventHandlerID() { + return getID(); + } } static Trace debug = Trace.create("container"); public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class |
