Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2005-06-26 22:26:55 +0000
committerslewis2005-06-26 22:26:55 +0000
commit499967802ecea039cd8156166fbe1e8fa583a057 (patch)
treea808feb9bf577ae70f84d98e39f240be46122e2a
parentd094649c4a71ac553f00b11102c0e11027e7bca6 (diff)
downloadorg.eclipse.ecf-499967802ecea039cd8156166fbe1e8fa583a057.tar.gz
org.eclipse.ecf-499967802ecea039cd8156166fbe1e8fa583a057.tar.xz
org.eclipse.ecf-499967802ecea039cd8156166fbe1e8fa583a057.zip
Added code to support IConnectionEventHandler.getEventHandlerID. Fixed tracing and increased tracing support in Client comm/tcp code.
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/.options1
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java2
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java160
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java4
4 files changed, 102 insertions, 65 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/.options b/framework/bundles/org.eclipse.ecf.provider/.options
index 19965ce51..5b80749fb 100644
--- a/framework/bundles/org.eclipse.ecf.provider/.options
+++ b/framework/bundles/org.eclipse.ecf.provider/.options
@@ -2,7 +2,6 @@ org.eclipse.ecf.provider/debug = true
org.eclipse.ecf.provider/debug/filter = *
org.eclipse.ecf.provider/debug/flag = true
org.eclipse.ecf.provider/debug/connection = true
-org.eclipse.ecf.provider/debug/connectionping = false
org.eclipse.ecf.provider/debug/container = true
org.eclipse.ecf.provider/debug/sharedobjectwrapper = true
org.eclipse.ecf.provider/debug/sharedobjectcontext = true
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java
index 8a228c7d6..2f77df1c4 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/Trace.java
@@ -18,7 +18,7 @@ import org.eclipse.core.runtime.Platform;
public class Trace {
public static final String tracePrefix = "(trace)";
- public static boolean ON = false;
+ public static boolean ON = true;
protected static boolean isEclipse = false;
protected static String pluginName = "";
protected static String debugPrefix = "/debug/";
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
index 08a715513..44161cc76 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/comm/tcp/Client.java
@@ -18,6 +18,7 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
+import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
@@ -25,6 +26,7 @@ import java.security.PrivilegedAction;
import java.util.Enumeration;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
import java.util.Vector;
import org.eclipse.ecf.core.comm.AsynchConnectionEvent;
import org.eclipse.ecf.core.comm.ConnectionDescription;
@@ -72,17 +74,19 @@ public final class Client implements ISynchAsynchConnection {
}
public static final String PROTOCOL = "ecftcp";
- public static final Trace debug = Trace.create("connection");
+ protected static final Trace trace = Trace.create("connection");
public static final int SNDR_PRIORITY = Thread.NORM_PRIORITY;
public static final int RCVR_PRIORITY = Thread.NORM_PRIORITY;
// Default close timeout is 1.5 seconds
- public static final long CLOSE_TIMEOUT = 1500;
+ public static final long CLOSE_TIMEOUT = 2000;
public static final int DEF_MAX_MSG = 50;
- protected String address;
- protected int port;
+
protected Socket socket;
+ private String addressPort = "-1:<no endpoint>:-1";
+
protected ObjectOutputStream outputStream;
protected ObjectInputStream inputStream;
+
protected ISynchAsynchConnectionEventHandler handler;
protected SimpleQueueImpl queue = new SimpleQueueImpl();
protected int keepAlive = 0;
@@ -98,22 +102,27 @@ public final class Client implements ISynchAsynchConnection {
protected long closeTimeout = CLOSE_TIMEOUT;
protected Vector eventNotify = null;
protected Map properties;
-
+
+ protected ID containerID = null;
+
public Map getProperties() {
return properties;
}
public Object getAdapter(Class clazz) {
return null;
}
- protected void debug(String msg) {
- if (Trace.ON && debug != null) {
- debug.msg(msg);
+ private String getAddressPort() {
+ return addressPort;
+ }
+ protected void trace(String msg) {
+ if (Trace.ON && trace != null) {
+ trace.msg(getAddressPort()+";"+msg);
}
}
protected void dumpStack(String msg, Throwable e) {
- if (Trace.ON && debug != null) {
- debug.dumpStack(e, msg);
+ if (Trace.ON && trace != null) {
+ trace.dumpStack(e, getAddressPort()+";"+msg);
}
}
@@ -127,19 +136,25 @@ public final class Client implements ISynchAsynchConnection {
throws IOException {
this(aSocket, iStream, oStream, handler, keepAlive, DEF_MAX_MSG);
}
-
+ private void setSocket(Socket s) throws SocketException {
+ socket = s;
+ if (s != null) {
+ // Set socket options
+ s.setTcpNoDelay(true);
+ addressPort = s.getLocalPort()+":"+s.getInetAddress().getHostName()+":"+s.getPort();
+ } else {
+ addressPort = "-1:<no endpoint>:-1";
+ }
+ }
public Client(Socket aSocket, ObjectInputStream iStream,
ObjectOutputStream oStream,
ISynchAsynchConnectionEventHandler handler, int keepAlive,
int maxmsgs) throws IOException {
- socket = aSocket;
- // Set TCP no delay
- socket.setTcpNoDelay(true);
- address = socket.getInetAddress().getHostName();
- port = socket.getPort();
+ setSocket(socket);
inputStream = iStream;
outputStream = oStream;
this.handler = handler;
+ containerID = handler.getEventHandlerID();
this.keepAlive = keepAlive;
maxMsg = maxmsgs;
properties = new Properties();
@@ -157,18 +172,20 @@ public final class Client implements ISynchAsynchConnection {
public Client(ISynchAsynchConnectionEventHandler handler, int keepAlive,
int maxmsgs) {
this.handler = handler;
+ containerID = handler.getEventHandlerID();
this.keepAlive = keepAlive;
maxMsg = maxmsgs;
this.properties = new Properties();
}
public synchronized ID getLocalID() {
+ if (containerID != null) return containerID;
if (socket == null)
return null;
ID retID = null;
try {
retID = IDFactory.makeStringID(PROTOCOL + "://"
- + socket.getLocalAddress().getHostName() + ":" + port);
+ + socket.getLocalAddress().getHostName() + ":" + socket.getLocalPort());
} catch (Exception e) {
return null;
}
@@ -216,48 +233,46 @@ public final class Client implements ISynchAsynchConnection {
public synchronized Object connect(ID remote, Object data, int timeout)
throws IOException {
- debug("connect(" + remote + "," + data + "," + timeout + ")");
+ trace("connect(" + remote + "," + data + "," + timeout + ")");
if (socket != null)
- throw new ConnectException("ClientApplication already connected");
+ throw new ConnectException("Already connected to "+getAddressPort());
+ // parse URI
URI anURI = null;
try {
anURI = remote.toURI();
} catch (URISyntaxException e) {
- throw new IOException("Can't connect to address "
- + remote.getName() + ". Invalid URL");
+ throw new IOException("Invalid URL");
}
- address = anURI.getHost();
- port = anURI.getPort();
+ // Get socket factory and create/connect socket
SocketFactory fact = SocketFactory.getSocketFactory();
if (fact == null) {
fact = SocketFactory.getDefaultSocketFactory();
}
- debug("socket connecting to " + address + ":" + port);
- // Actually connect to remote using socket from socket factory.
- socket = fact.createSocket(address, port, timeout);
- // Set TCP no delay
- socket.setTcpNoDelay(true);
- boolean compatibility = false;
- //boolean compatibility = TCPCompatibility.useCompatibility;
- outputStream = new ExObjectOutputStream(new BufferedOutputStream(socket
- .getOutputStream()), compatibility);
+ Socket s = fact.createSocket(anURI.getHost(), anURI.getPort(), timeout);
+ // Now we've got a connection so set our socket
+ setSocket(s);
+ outputStream = new ExObjectOutputStream(new BufferedOutputStream(s
+ .getOutputStream()), false);
outputStream.flush();
- inputStream = new ExObjectInputStream(socket.getInputStream(),
- compatibility);
- // send connect data
- debug("send:" + address + ":" + port + ":" + data);
+ inputStream = new ExObjectInputStream(s.getInputStream(),
+ false);
+ trace("connect;" + anURI);
+ // send connect data and get syncronous response
sendIt(new ConnectRequestMessage(anURI, (Serializable) data));
ConnectResultMessage res = null;
res = (ConnectResultMessage) readObject();
- debug("recv:" + address + ":" + port + ":" + res);
+ trace("connect;rcv:"+ res);
// Setup threads
setupThreads();
// Return results.
- return res.getData();
+ Object ret = res.getData();
+ trace("connect;returning:"+ret);
+ return ret;
}
private void setupThreads() {
// Setup threads
+ trace("setupThreads()");
sendThread = (Thread) AccessController
.doPrivileged(new PrivilegedAction() {
public Object run() {
@@ -286,7 +301,7 @@ public final class Client implements ISynchAsynchConnection {
break;
try {
// Actually send message
- debug("send:" + address + ":" + port + ":" + aMsg);
+ trace("send:" + aMsg);
sendIt(aMsg);
// Successful...remove message from queue
queue.removeHead();
@@ -299,11 +314,13 @@ public final class Client implements ISynchAsynchConnection {
msgCount++;
} catch (IOException e) {
if (isClosing) {
- isClosing = false;
+ //isClosing = false;
+ dumpStack("SENDER CLOSING",e);
synchronized (Client.this) {
Client.this.notifyAll();
}
} else {
+ dumpStack("SENDER EXCEPTION",e);
if (!handler.handleSuspectEvent(new ConnectionEvent(
Client.this, e))) {
handler
@@ -314,9 +331,9 @@ public final class Client implements ISynchAsynchConnection {
break;
}
}
- debug("sender:" + address + ":" + port + " terminating");
+ trace("SENDER TERMINATING");
}
- }, "sndr:" + address + ":" + port);
+ }, getLocalID() + ":sndr:" + getAddressPort());
// Set priority for new thread
aThread.setPriority(SNDR_PRIORITY);
return aThread;
@@ -324,25 +341,29 @@ public final class Client implements ISynchAsynchConnection {
private void closeSocket() {
try {
- if (socket != null)
+ if (socket != null) {
socket.close();
+ setSocket(null);
+ }
} catch (IOException e) {
+ dumpStack("socket close",e);
}
}
private void sendIt(Serializable snd) throws IOException {
// Write object to output stream
+ trace("sendIt("+snd+")");
synchronized (outputStream) {
outputStream.writeObject(snd);
outputStream.flush();
- nextPingTime = System.currentTimeMillis() + keepAlive;
+ nextPingTime = System.currentTimeMillis() + (keepAlive/2);
}
}
private void receiveResp() {
synchronized (outputStream) {
waitForPing = false;
- nextPingTime = System.currentTimeMillis() + keepAlive;
+ nextPingTime = System.currentTimeMillis() + (keepAlive/2);
outputStream.notifyAll();
}
}
@@ -353,17 +374,15 @@ public final class Client implements ISynchAsynchConnection {
private void sendClose(Serializable snd) throws IOException {
isClosing = true;
- debug("send:" + address + ":" + port + ":" + snd);
+ trace("sendClose(" + snd + ")");
sendIt(snd);
if (isClosing) {
try {
wait(closeTimeout);
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ dumpStack("sendClose wait",e);
}
}
- // Before returning, actually remove remote objects
- //handler.handleDisconnectEvent(new DisconnectConnectionEvent(
- //ClientApplication.this, null, queue));
}
private Thread getRcvThread() {
@@ -378,11 +397,12 @@ public final class Client implements ISynchAsynchConnection {
handleRcv(readObject());
} catch (IOException e) {
if (isClosing) {
- isClosing = false;
+ dumpStack("RCVR CLOSING",e);
synchronized (Client.this) {
Client.this.notifyAll();
}
} else {
+ dumpStack("RCVR EXCEPTION",e);
if (!handler.handleSuspectEvent(new ConnectionEvent(
Client.this, e))) {
handler
@@ -393,9 +413,9 @@ public final class Client implements ISynchAsynchConnection {
break;
}
}
- debug("read:" + address + ":" + port + " terminating");
+ trace("RCVR TERMINATING");
}
- }, "rcvr:" + address + ":" + port);
+ }, getLocalID() + ":rcvr:" + getAddressPort());
// Set priority and return
aThread.setPriority(RCVR_PRIORITY);
return aThread;
@@ -409,16 +429,17 @@ public final class Client implements ISynchAsynchConnection {
if (rcv instanceof SynchMessage) {
// Handle synch message. The only valid synch message is
// 'close'.
- debug("recv:" + address + ":" + port + ":" + rcv);
+ trace("recv:" + rcv);
handler.handleSynchEvent(new SynchConnectionEvent(this,
((SynchMessage) rcv).getData()));
} else if (rcv instanceof AsynchMessage) {
- debug("recv:" + address + ":" + port + ":" + rcv);
+ trace("recv:" + rcv);
Serializable d = ((AsynchMessage) rcv).getData();
// Handle asynch messages.
handler.handleAsynchEvent(new AsynchConnectionEvent(this, d));
} else if (rcv instanceof PingMessage) {
// Handle ping by sending response back immediately
+ trace("recv:" + rcv);
sendIt(pingResp);
} else if (rcv instanceof PingResponseMessage) {
// Do nothing with ping response
@@ -429,9 +450,8 @@ public final class Client implements ISynchAsynchConnection {
throw e;
}
}
-
public synchronized void start() {
- debug("start(" + address + ":" + port + ")");
+ trace("start()");
if (sendThread != null)
sendThread.start();
if (rcvThread != null)
@@ -444,12 +464,21 @@ public final class Client implements ISynchAsynchConnection {
}
public void stop() {
+ trace("stop()");
}
private Thread setupPing() {
+ trace("setupPing()");
+ final int pingStartWait = (new Random()).nextInt(keepAlive / 2);
return new Thread(new Runnable() {
public void run() {
Thread me = Thread.currentThread();
+ // Sleep a random interval to start
+ try {
+ Thread.sleep(pingStartWait);
+ } catch (InterruptedException e) {
+ return;
+ }
while (!queue.isStopped()) {
try {
if (me.isInterrupted())
@@ -476,18 +505,18 @@ public final class Client implements ISynchAsynchConnection {
}
}
if (waitForPing) {
- throw new IOException(address + ":" + port
- + " not reachable.");
+ throw new IOException(getAddressPort()+ " not reachable");
}
}
}
} catch (Exception e) {
if (isClosing) {
- isClosing = false;
+ dumpStack("PING CLOSING",e);
synchronized (Client.this) {
Client.this.notifyAll();
}
} else {
+ dumpStack("PING EXCEPTION",e);
if (!handler.handleSuspectEvent(new ConnectionEvent(
Client.this, e))) {
handler
@@ -498,12 +527,13 @@ public final class Client implements ISynchAsynchConnection {
break;
}
}
+ trace("PING TERMINATING");
}
- }, "ping:" + address + ":" + port);
+ }, getLocalID()+":ping:"+getAddressPort());
}
public synchronized void disconnect() throws IOException {
- debug("disconnect(" + address + ":" + port + ")");
+ trace("disconnect()");
// Close send queue and socket
queue.close();
closeSocket();
@@ -537,6 +567,7 @@ public final class Client implements ISynchAsynchConnection {
throws IOException {
if (queue.isStopped() || isClosing)
throw new ConnectException("Not connected");
+ trace("queueObject("+recipient+","+obj+")");
queue.enqueue(new AsynchMessage(obj));
}
@@ -544,15 +575,18 @@ public final class Client implements ISynchAsynchConnection {
throws IOException {
if (queue.isStopped() || isClosing)
throw new ConnectException("Not connected");
+ trace("queueObject("+recipient+","+obj+")");
sendClose(new SynchMessage(obj));
return null;
}
public Object sendSynch(ID rec, Object obj) throws IOException {
+ trace("sendSynch("+rec+","+obj+")");
return sendObject(rec, (Serializable) obj);
}
public Object sendSynch(ID rec, byte[] obj) throws IOException {
+ trace("sendSynch("+rec+","+obj+")");
return sendObject(rec, obj);
}
@@ -561,7 +595,7 @@ public final class Client implements ISynchAsynchConnection {
try {
ret = (Serializable) inputStream.readObject();
} catch (ClassNotFoundException e) {
- dumpStack("ClassNotFoundException", e);
+ dumpStack("readObject;classnotfoundexception", e);
throw new IOException(
"Protocol violation due to class load failure. "
+ e.getMessage());
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
index 7c5923d23..9dd9f5896 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
@@ -226,6 +226,10 @@ public abstract class SOContainer implements ISharedObjectContainer {
throws IOException {
return processSynch(event);
}
+
+ public ID getEventHandlerID() {
+ return getID();
+ }
}
static Trace debug = Trace.create("container");
public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class

Back to the top