diff options
| author | slewis | 2005-08-04 19:27:09 +0000 |
|---|---|---|
| committer | slewis | 2005-08-04 19:27:09 +0000 |
| commit | b341f3f23e4cb0a54e1d161bb99e9469a1c8e016 (patch) | |
| tree | f21ae822f1b0c0fec35a7f1095ef6e31df023579 | |
| parent | b84998541d3bd2c7c618a3c5a96352f18cd33299 (diff) | |
| download | org.eclipse.ecf-b341f3f23e4cb0a54e1d161bb99e9469a1c8e016.tar.gz org.eclipse.ecf-b341f3f23e4cb0a54e1d161bb99e9469a1c8e016.tar.xz org.eclipse.ecf-b341f3f23e4cb0a54e1d161bb99e9469a1c8e016.zip | |
Fix for timing bug in tcp communications code
5 files changed, 36 insertions, 22 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java index 8b98cbdfa..928bb581e 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Vector; + import org.eclipse.ecf.core.comm.AsynchConnectionEvent; import org.eclipse.ecf.core.comm.ConnectionDescription; import org.eclipse.ecf.core.comm.ConnectionEvent; @@ -105,6 +106,8 @@ public final class Client implements ISynchAsynchConnection { protected ID containerID = null; + protected Object pingLock = new Object(); + public Map getProperties() { return properties; } @@ -187,6 +190,7 @@ public final class Client implements ISynchAsynchConnection { retID = IDFactory.getDefault().makeStringID(PROTOCOL + "://" + socket.getLocalAddress().getHostName() + ":" + socket.getLocalPort()); } catch (Exception e) { + dumpStack("Exception in getLocalID()",e); return null; } return retID; @@ -241,7 +245,9 @@ public final class Client implements ISynchAsynchConnection { try { anURI = remote.toURI(); } catch (URISyntaxException e) { - throw new IOException("Invalid URL"); + IOException except = new IOException("Invalid URI for remote "+remote); + except.setStackTrace(e.getStackTrace()); + throw except; } // Get socket factory and create/connect socket SocketFactory fact = SocketFactory.getSocketFactory(); @@ -292,29 +298,27 @@ public final class Client implements ISynchAsynchConnection { public void run() { int msgCount = 0; Thread me = Thread.currentThread(); - // Loop until done sending messages + // Loop until done sending messages (thread explicitly interrupted or queue.peekQueue() returns null for (;;) { if (me.isInterrupted()) break; + // sender should wait here until something appears in queue or queue is stopped (returns null) Serializable aMsg = (Serializable) queue.peekQueue(); if (me.isInterrupted() || aMsg == null) break; try { // Actually send message - trace("send:" + aMsg); sendIt(aMsg); // Successful...remove message from queue queue.removeHead(); if (msgCount > maxMsg) { - synchronized (outputStream) { - outputStream.reset(); - } + outputStream.reset(); msgCount = 0; } else msgCount++; + } catch (IOException e) { if (isClosing) { - //isClosing = false; dumpStack("SENDER CLOSING",e); synchronized (Client.this) { Client.this.notifyAll(); @@ -353,19 +357,16 @@ public final class Client implements ISynchAsynchConnection { private void sendIt(Serializable snd) throws IOException { // Write object to output stream trace("sendIt("+snd+")"); - synchronized (outputStream) { - outputStream.writeObject(snd); - outputStream.flush(); - nextPingTime = System.currentTimeMillis() + (keepAlive/2); - } + nextPingTime = System.currentTimeMillis() + (keepAlive/2); + outputStream.writeObject(snd); + outputStream.flush(); } private void receiveResp() { - synchronized (outputStream) { - waitForPing = false; - nextPingTime = System.currentTimeMillis() + (keepAlive/2); - outputStream.notifyAll(); - } + synchronized (pingLock) { + waitForPing = false; + nextPingTime = System.currentTimeMillis() + (keepAlive/2); + } } public void setCloseTimeout(long t) { @@ -421,6 +422,8 @@ public final class Client implements ISynchAsynchConnection { return aThread; } + //private int rcvCount = 0; + private void handleRcv(Serializable rcv) throws IOException { try { // We've received some data, so the connection is alive @@ -487,7 +490,7 @@ public final class Client implements ISynchAsynchConnection { break; // Check to see how long it has been since our last // send. - synchronized (outputStream) { + synchronized (pingLock) { if (System.currentTimeMillis() >= nextPingTime) { // If it's been longer than our timeout // interval, then ping @@ -498,8 +501,10 @@ public final class Client implements ISynchAsynchConnection { try { // Wait for keepAliveInterval for // pingresp - outputStream.wait(keepAlive / 2); + pingLock.wait(keepAlive / 2); } catch (InterruptedException e) { + dumpStack("PING INTERRUPTED",e); + return; } } if (waitForPing) { @@ -561,6 +566,7 @@ public final class Client implements ISynchAsynchConnection { queueObject(recipient, (Serializable) obj); } + // private int serverQueueCount = 0; public synchronized void queueObject(ID recipient, Serializable obj) throws IOException { if (queue.isStopped() || isClosing) @@ -594,9 +600,11 @@ public final class Client implements ISynchAsynchConnection { ret = (Serializable) inputStream.readObject(); } catch (ClassNotFoundException e) { dumpStack("readObject;classnotfoundexception", e); - throw new IOException( + IOException except = new IOException( "Protocol violation due to class load failure. " + e.getMessage()); + except.setStackTrace(e.getStackTrace()); + throw except; } return ret; } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java index 9968f5c5f..6700223bb 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java @@ -15,8 +15,8 @@ */ package org.eclipse.ecf.provider.generic; -import org.eclipse.ecf.core.util.IEnqueuePredicate; import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.core.util.IEnqueuePredicate; import org.eclipse.ecf.core.util.IQueueEnqueue; import org.eclipse.ecf.core.util.QueueException; import org.eclipse.ecf.core.util.SimpleQueueImpl; diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java index 41d49300c..3128bcc10 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java @@ -775,7 +775,7 @@ public abstract class SOContainer implements ISharedObjectContainer { .getData())); } catch (ClassNotFoundException e) { dumpStack( - "Classnotfoundexception in handleSharedObjectMessage", + "ClassNotFoundException in handleSharedObjectMessage", e); e.printStackTrace(System.err); } @@ -942,14 +942,18 @@ public abstract class SOContainer implements ISharedObjectContainer { try { Object obj = e.getData(); if (obj == null) { + System.out.println("NULL DATA IN EVENT: "+e); debug("Ignoring null data in event " + e); return; } if (!(obj instanceof byte[])) { debug("Ignoring event without valid data " + e); + System.out.println("NOT BYTE [] DATA IN EVENT: "+e); + return; } ContainerMessage mess = validateContainerMessage(deserializeContainerMessage((byte[]) obj)); if (mess == null) { + System.out.println("EVENT NOT VALIDATED: "+e); return; } Serializable submess = mess.getData(); diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java index 5d3a7b279..00cb8b446 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java @@ -14,6 +14,7 @@ package org.eclipse.ecf.provider.generic; import java.io.Serializable; import java.security.AccessController; import java.security.PrivilegedAction; + import org.eclipse.ecf.core.ISharedObject; import org.eclipse.ecf.core.SharedObjectInitException; import org.eclipse.ecf.core.events.RemoteSharedObjectCreateResponseEvent; diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java index 08342f38e..64d3e22af 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java @@ -97,6 +97,7 @@ public class ServerSOContainer extends SOContainer implements ISharedObjectConta } catch (IOException e) { logException("Exception in forwardExcluding from " + from + " with oldID " + oldID, e); + e.printStackTrace(); } } } |
