Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2005-08-04 19:27:09 +0000
committerslewis2005-08-04 19:27:09 +0000
commitb341f3f23e4cb0a54e1d161bb99e9469a1c8e016 (patch)
treef21ae822f1b0c0fec35a7f1095ef6e31df023579
parentb84998541d3bd2c7c618a3c5a96352f18cd33299 (diff)
downloadorg.eclipse.ecf-b341f3f23e4cb0a54e1d161bb99e9469a1c8e016.tar.gz
org.eclipse.ecf-b341f3f23e4cb0a54e1d161bb99e9469a1c8e016.tar.xz
org.eclipse.ecf-b341f3f23e4cb0a54e1d161bb99e9469a1c8e016.zip
Fix for timing bug in tcp communications code
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java48
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java2
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java6
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java1
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java1
5 files changed, 36 insertions, 22 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
index 8b98cbdfa..928bb581e 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Vector;
+
import org.eclipse.ecf.core.comm.AsynchConnectionEvent;
import org.eclipse.ecf.core.comm.ConnectionDescription;
import org.eclipse.ecf.core.comm.ConnectionEvent;
@@ -105,6 +106,8 @@ public final class Client implements ISynchAsynchConnection {
protected ID containerID = null;
+ protected Object pingLock = new Object();
+
public Map getProperties() {
return properties;
}
@@ -187,6 +190,7 @@ public final class Client implements ISynchAsynchConnection {
retID = IDFactory.getDefault().makeStringID(PROTOCOL + "://"
+ socket.getLocalAddress().getHostName() + ":" + socket.getLocalPort());
} catch (Exception e) {
+ dumpStack("Exception in getLocalID()",e);
return null;
}
return retID;
@@ -241,7 +245,9 @@ public final class Client implements ISynchAsynchConnection {
try {
anURI = remote.toURI();
} catch (URISyntaxException e) {
- throw new IOException("Invalid URL");
+ IOException except = new IOException("Invalid URI for remote "+remote);
+ except.setStackTrace(e.getStackTrace());
+ throw except;
}
// Get socket factory and create/connect socket
SocketFactory fact = SocketFactory.getSocketFactory();
@@ -292,29 +298,27 @@ public final class Client implements ISynchAsynchConnection {
public void run() {
int msgCount = 0;
Thread me = Thread.currentThread();
- // Loop until done sending messages
+ // Loop until done sending messages (thread explicitly interrupted or queue.peekQueue() returns null
for (;;) {
if (me.isInterrupted())
break;
+ // sender should wait here until something appears in queue or queue is stopped (returns null)
Serializable aMsg = (Serializable) queue.peekQueue();
if (me.isInterrupted() || aMsg == null)
break;
try {
// Actually send message
- trace("send:" + aMsg);
sendIt(aMsg);
// Successful...remove message from queue
queue.removeHead();
if (msgCount > maxMsg) {
- synchronized (outputStream) {
- outputStream.reset();
- }
+ outputStream.reset();
msgCount = 0;
} else
msgCount++;
+
} catch (IOException e) {
if (isClosing) {
- //isClosing = false;
dumpStack("SENDER CLOSING",e);
synchronized (Client.this) {
Client.this.notifyAll();
@@ -353,19 +357,16 @@ public final class Client implements ISynchAsynchConnection {
private void sendIt(Serializable snd) throws IOException {
// Write object to output stream
trace("sendIt("+snd+")");
- synchronized (outputStream) {
- outputStream.writeObject(snd);
- outputStream.flush();
- nextPingTime = System.currentTimeMillis() + (keepAlive/2);
- }
+ nextPingTime = System.currentTimeMillis() + (keepAlive/2);
+ outputStream.writeObject(snd);
+ outputStream.flush();
}
private void receiveResp() {
- synchronized (outputStream) {
- waitForPing = false;
- nextPingTime = System.currentTimeMillis() + (keepAlive/2);
- outputStream.notifyAll();
- }
+ synchronized (pingLock) {
+ waitForPing = false;
+ nextPingTime = System.currentTimeMillis() + (keepAlive/2);
+ }
}
public void setCloseTimeout(long t) {
@@ -421,6 +422,8 @@ public final class Client implements ISynchAsynchConnection {
return aThread;
}
+ //private int rcvCount = 0;
+
private void handleRcv(Serializable rcv) throws IOException {
try {
// We've received some data, so the connection is alive
@@ -487,7 +490,7 @@ public final class Client implements ISynchAsynchConnection {
break;
// Check to see how long it has been since our last
// send.
- synchronized (outputStream) {
+ synchronized (pingLock) {
if (System.currentTimeMillis() >= nextPingTime) {
// If it's been longer than our timeout
// interval, then ping
@@ -498,8 +501,10 @@ public final class Client implements ISynchAsynchConnection {
try {
// Wait for keepAliveInterval for
// pingresp
- outputStream.wait(keepAlive / 2);
+ pingLock.wait(keepAlive / 2);
} catch (InterruptedException e) {
+ dumpStack("PING INTERRUPTED",e);
+ return;
}
}
if (waitForPing) {
@@ -561,6 +566,7 @@ public final class Client implements ISynchAsynchConnection {
queueObject(recipient, (Serializable) obj);
}
+ // private int serverQueueCount = 0;
public synchronized void queueObject(ID recipient, Serializable obj)
throws IOException {
if (queue.isStopped() || isClosing)
@@ -594,9 +600,11 @@ public final class Client implements ISynchAsynchConnection {
ret = (Serializable) inputStream.readObject();
} catch (ClassNotFoundException e) {
dumpStack("readObject;classnotfoundexception", e);
- throw new IOException(
+ IOException except = new IOException(
"Protocol violation due to class load failure. "
+ e.getMessage());
+ except.setStackTrace(e.getStackTrace());
+ throw except;
}
return ret;
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java
index 9968f5c5f..6700223bb 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/QueueEnqueueImpl.java
@@ -15,8 +15,8 @@
*/
package org.eclipse.ecf.provider.generic;
-import org.eclipse.ecf.core.util.IEnqueuePredicate;
import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.core.util.IEnqueuePredicate;
import org.eclipse.ecf.core.util.IQueueEnqueue;
import org.eclipse.ecf.core.util.QueueException;
import org.eclipse.ecf.core.util.SimpleQueueImpl;
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
index 41d49300c..3128bcc10 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
@@ -775,7 +775,7 @@ public abstract class SOContainer implements ISharedObjectContainer {
.getData()));
} catch (ClassNotFoundException e) {
dumpStack(
- "Classnotfoundexception in handleSharedObjectMessage",
+ "ClassNotFoundException in handleSharedObjectMessage",
e);
e.printStackTrace(System.err);
}
@@ -942,14 +942,18 @@ public abstract class SOContainer implements ISharedObjectContainer {
try {
Object obj = e.getData();
if (obj == null) {
+ System.out.println("NULL DATA IN EVENT: "+e);
debug("Ignoring null data in event " + e);
return;
}
if (!(obj instanceof byte[])) {
debug("Ignoring event without valid data " + e);
+ System.out.println("NOT BYTE [] DATA IN EVENT: "+e);
+ return;
}
ContainerMessage mess = validateContainerMessage(deserializeContainerMessage((byte[]) obj));
if (mess == null) {
+ System.out.println("EVENT NOT VALIDATED: "+e);
return;
}
Serializable submess = mess.getData();
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java
index 5d3a7b279..00cb8b446 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOWrapper.java
@@ -14,6 +14,7 @@ package org.eclipse.ecf.provider.generic;
import java.io.Serializable;
import java.security.AccessController;
import java.security.PrivilegedAction;
+
import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.events.RemoteSharedObjectCreateResponseEvent;
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java
index 08342f38e..64d3e22af 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/ServerSOContainer.java
@@ -97,6 +97,7 @@ public class ServerSOContainer extends SOContainer implements ISharedObjectConta
} catch (IOException e) {
logException("Exception in forwardExcluding from "
+ from + " with oldID " + oldID, e);
+ e.printStackTrace();
}
}
}

Back to the top