Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2006-10-23 13:41:10 -0400
committerEike Stepper2006-10-23 13:41:10 -0400
commita95acf5fcc86191fe51a7e0c8e899bd4614462df (patch)
tree2ceff92429cce2fa7cdeb5270432c22c583b7916 /plugins/org.eclipse.net4j
parent697d6277d2715b06e41abc790ab8b053df1520c7 (diff)
downloadcdo-a95acf5fcc86191fe51a7e0c8e899bd4614462df.tar.gz
cdo-a95acf5fcc86191fe51a7e0c8e899bd4614462df.tar.xz
cdo-a95acf5fcc86191fe51a7e0c8e899bd4614462df.zip
Signal protocol
Diffstat (limited to 'plugins/org.eclipse.net4j')
-rw-r--r--plugins/org.eclipse.net4j/.settings/org.eclipse.jdt.ui.prefs3
-rw-r--r--plugins/org.eclipse.net4j/META-INF/MANIFEST.MF1
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java9
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferFactoryImpl.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java59
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferPoolImpl.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java38
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java47
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java41
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java49
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java58
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalActor.java53
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalReactor.java50
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/HexUtil.java45
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/ReflectUtil.java185
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/lifecycle/AbstractLifecycle.java25
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java12
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferOutputStream.java11
20 files changed, 663 insertions, 43 deletions
diff --git a/plugins/org.eclipse.net4j/.settings/org.eclipse.jdt.ui.prefs b/plugins/org.eclipse.net4j/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 0000000000..b4b4193774
--- /dev/null
+++ b/plugins/org.eclipse.net4j/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,3 @@
+#Mon Oct 23 07:25:57 CEST 2006
+eclipse.preferences.version=1
+internal.default.compliance=default
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
index 7294a1e280..da8d7fac78 100644
--- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
@@ -14,6 +14,7 @@ Export-Package: org.eclipse.internal.net4j.transport;version="0.8.0",
org.eclipse.net4j.signal;version="0.8.0",
org.eclipse.net4j.transport;version="0.8.0",
org.eclipse.net4j.transport.tcp;version="0.8.0",
+ org.eclipse.net4j.util;version="0.8.0",
org.eclipse.net4j.util.concurrent;version="0.8.0",
org.eclipse.net4j.util.lifecycle;version="0.8.0",
org.eclipse.net4j.util.map;version="0.8.0",
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
index cbf6991284..acaa90cc28 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
@@ -38,7 +38,14 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractConnector extends AbstractLifecycle implements Connector,
BufferProvider
{
- private static final ChannelImpl NULL_CHANNEL = new ChannelImpl(null);
+ private static final ChannelImpl NULL_CHANNEL = new ChannelImpl(null)
+ {
+ @Override
+ public String toString()
+ {
+ return "NullChannel";
+ }
+ };
private ConnectorCredentials credentials;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferFactoryImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferFactoryImpl.java
index 43ed786cd4..e270ec76c7 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferFactoryImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferFactoryImpl.java
@@ -25,8 +25,9 @@ public class BufferFactoryImpl extends BufferProviderImpl
@Override
protected Buffer doProvideBuffer()
{
- System.out.println(toString() + ": Creating buffer of capacity " + getBufferCapacity());
- return new BufferImpl(this, getBufferCapacity());
+ BufferImpl buffer = new BufferImpl(this, getBufferCapacity());
+ System.out.println(toString() + ": Created " + buffer);
+ return buffer;
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
index 1f04a58c39..bf43de37c6 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
@@ -12,6 +12,8 @@ package org.eclipse.internal.net4j.transport;
import org.eclipse.net4j.transport.Buffer;
import org.eclipse.net4j.transport.BufferProvider;
+import org.eclipse.net4j.util.HexUtil;
+import org.eclipse.net4j.util.ReflectUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -27,6 +29,8 @@ public class BufferImpl implements Buffer
public static final short NO_CHANNEL = Short.MIN_VALUE;
+ public static boolean TRACE = false;
+
private static final int EOS_OFFSET = 1;
private BufferProvider bufferProvider;
@@ -156,8 +160,6 @@ public class BufferImpl implements Buffer
}
payloadSize -= EOS_OFFSET;
- System.out.println(toString() + ": Read " + (HEADER_SIZE + payloadSize) + " bytes"
- + (eos ? " (EOS)" : ""));
byteBuffer.clear();
byteBuffer.limit(payloadSize);
@@ -175,6 +177,13 @@ public class BufferImpl implements Buffer
return null;
}
+ if (TRACE)
+ {
+ System.out.println(toString() + ": Read " + byteBuffer.limit() + " bytes"
+ + (eos ? " (EOS)" : ""));
+ System.out.println(formatContent());
+ }
+
byteBuffer.flip();
state = State.GETTING;
return byteBuffer;
@@ -212,7 +221,7 @@ public class BufferImpl implements Buffer
throw new IllegalStateException("state == " + state);
}
- if (state != State.WRITING)
+ if (state == State.PUTTING)
{
if (channelID == NO_CHANNEL)
{
@@ -225,6 +234,13 @@ public class BufferImpl implements Buffer
payloadSize = -payloadSize;
}
+ if (TRACE)
+ {
+ System.out.println(toString() + ": Writing " + (payloadSize - 1) + " bytes"
+ + (eos ? " (EOS)" : ""));
+ System.out.println(formatContent());
+ }
+
byteBuffer.flip();
byteBuffer.putShort(channelID);
byteBuffer.putShort((short)payloadSize);
@@ -232,7 +248,8 @@ public class BufferImpl implements Buffer
state = State.WRITING;
}
- if (socketChannel.write(byteBuffer) == -1)
+ int numBytes = socketChannel.write(byteBuffer);
+ if (numBytes == -1)
{
throw new IOException("Channel closed");
}
@@ -242,8 +259,6 @@ public class BufferImpl implements Buffer
return false;
}
- System.out.println(toString() + ": Wrote " + byteBuffer.limit() + " bytes"
- + (eos ? " (EOS)" : ""));
clear();
return true;
}
@@ -251,7 +266,37 @@ public class BufferImpl implements Buffer
@Override
public String toString()
{
- return "Buffer[channelID=" + channelID + ", state=" + state + "]";
+ return "Buffer@" + ReflectUtil.getID(this);
+ }
+
+ public String formatContent()
+ {
+ final int oldPosition = byteBuffer.position();
+ final int oldLimit = byteBuffer.limit();
+
+ try
+ {
+ byteBuffer.flip();
+ if (state == State.PUTTING)
+ {
+ byteBuffer.position(HEADER_SIZE);
+ }
+
+ StringBuilder builder = new StringBuilder();
+ while (byteBuffer.hasRemaining())
+ {
+ byte b = byteBuffer.get();
+ HexUtil.appendHex(builder, b < 0 ? ~b : b);
+ builder.append(' ');
+ }
+
+ return builder.toString();
+ }
+ finally
+ {
+ byteBuffer.position(oldPosition);
+ byteBuffer.limit(oldLimit);
+ }
}
/**
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferPoolImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferPoolImpl.java
index 3f3b254b9a..c2147e0535 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferPoolImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferPoolImpl.java
@@ -48,7 +48,7 @@ public class BufferPoolImpl extends BufferProviderImpl implements BufferPool,
return false;
}
- System.out.println(toString() + ": Evicting buffer");
+ System.out.println(toString() + ": Evicting " + buffer);
factory.retainBuffer(buffer);
--pooledBuffers;
return true;
@@ -76,17 +76,14 @@ public class BufferPoolImpl extends BufferProviderImpl implements BufferPool,
protected Buffer doProvideBuffer()
{
Buffer buffer = queue.poll();
- if (buffer != null)
- {
- System.out.println(toString() + ": Obtaining buffer");
- }
- else
+ if (buffer == null)
{
buffer = factory.provideBuffer();
((BufferImpl)buffer).setBufferProvider(this);
}
buffer.clear();
+ System.out.println(toString() + ": Obtained " + buffer);
return buffer;
}
@@ -98,7 +95,7 @@ public class BufferPoolImpl extends BufferProviderImpl implements BufferPool,
throw new IllegalArgumentException("buffer.getCapacity() != getBufferCapacity()");
}
- System.out.println(toString() + ": Retaining buffer");
+ System.out.println(toString() + ": Retaining " + buffer);
queue.add(buffer);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
index 6aaacc8f80..8924e2e07b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
@@ -156,7 +156,7 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
@Override
public String toString()
{
- return "Channel[" + connector + ":" + channelID + "]";
+ return "Channel[" + connector + ", channelID=" + channelID + "]";
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java
new file mode 100644
index 0000000000..fb7931fd15
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java
@@ -0,0 +1,38 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.stream.BufferInputStream;
+import org.eclipse.net4j.util.stream.BufferOutputStream;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * @author Eike Stepper
+ */
+public abstract class Indication extends StrictSignalReactor
+{
+ protected Indication()
+ {
+ }
+
+ @Override
+ protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ System.out.println("================ Indicating " + this);
+ inputAllowed = true;
+ indicating(getDataInputStream());
+ inputAllowed = false;
+ }
+
+ protected abstract void indicating(DataInputStream in) throws IOException;
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
new file mode 100644
index 0000000000..5c1da7e1f3
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
@@ -0,0 +1,47 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.stream.BufferInputStream;
+import org.eclipse.net4j.util.stream.BufferOutputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * @author Eike Stepper
+ */
+public abstract class IndicationWithResponse extends StrictSignalReactor
+{
+ protected IndicationWithResponse()
+ {
+ }
+
+ @Override
+ protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ System.out.println("================ Indicating " + this);
+ inputAllowed = true;
+ indicating(getDataInputStream());
+ inputAllowed = false;
+
+ System.out.println("================ Responding " + this);
+ outputAllowed = true;
+ responding(getDataOutputStream());
+ outputAllowed = false;
+ out.flush();
+ }
+
+ protected abstract void indicating(DataInputStream in) throws IOException;
+
+ protected abstract void responding(DataOutputStream out) throws IOException;
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
new file mode 100644
index 0000000000..6e0dc19ce5
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
@@ -0,0 +1,41 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.transport.Channel;
+import org.eclipse.net4j.util.stream.BufferInputStream;
+import org.eclipse.net4j.util.stream.BufferOutputStream;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * @author Eike Stepper
+ */
+public abstract class Request<RESULT> extends StrictSignalActor<RESULT>
+{
+ protected Request(Channel channel)
+ {
+ super(channel);
+ }
+
+ @Override
+ protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ System.out.println("================ Requesting " + this);
+ outputAllowed = true;
+ requesting(getDataOutputStream());
+ outputAllowed = false;
+ out.flush();
+ }
+
+ protected abstract void requesting(DataOutputStream out) throws IOException;
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
new file mode 100644
index 0000000000..281bb368be
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
@@ -0,0 +1,49 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.transport.Channel;
+import org.eclipse.net4j.util.stream.BufferInputStream;
+import org.eclipse.net4j.util.stream.BufferOutputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * @author Eike Stepper
+ */
+public abstract class RequestWithConfirmation<RESULT> extends StrictSignalActor<RESULT>
+{
+ protected RequestWithConfirmation(Channel channel)
+ {
+ super(channel);
+ }
+
+ @Override
+ protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ System.out.println("================ Requesting " + this);
+ outputAllowed = true;
+ requesting(getDataOutputStream());
+ outputAllowed = false;
+ out.flush();
+
+ System.out.println("================ Confirming " + this);
+ inputAllowed = true;
+ setResult(confirming(getDataInputStream()));
+ inputAllowed = false;
+ }
+
+ protected abstract void requesting(DataOutputStream out) throws IOException;
+
+ protected abstract RESULT confirming(DataInputStream in) throws IOException;
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index 2706d6c454..f90b68ff89 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -35,7 +35,7 @@ public abstract class SignalProtocol extends AbstractProtocol
private Map<Integer, Signal> signals = new ConcurrentHashMap();
- private int nextCorrelationID = 0;
+ private int nextCorrelationID = 1;
protected SignalProtocol(Channel channel, ExecutorService executorService)
{
@@ -58,21 +58,36 @@ public abstract class SignalProtocol extends AbstractProtocol
{
ByteBuffer byteBuffer = buffer.getByteBuffer();
int correlationID = byteBuffer.getInt();
- System.out.println("Received buffer for signal " + correlationID);
+ System.out.println(toString() + ": Received buffer for correlation " + correlationID);
- Signal signal = signals.get(correlationID);
- if (signal == null)
+ Signal signal;
+ if (correlationID > 0)
{
- short signalID = byteBuffer.getShort();
- System.out.println("Got signal id " + signalID);
-
- signal = createSignalReactor(signalID);
- signal.setProtocol(this);
- signal.setCorrelationID(correlationID);
- signal.setInputStream(createInputStream());
- signal.setOutputStream(createOutputStream(correlationID, signalID));
- signals.put(correlationID, signal);
- executorService.execute(signal);
+ // Incoming indication
+ signal = signals.get(-correlationID);
+ if (signal == null)
+ {
+ short signalID = byteBuffer.getShort();
+ System.out.println(toString() + ": Got signal id " + signalID);
+
+ signal = createSignalReactor(signalID);
+ signal.setProtocol(this);
+ signal.setCorrelationID(-correlationID);
+ signal.setInputStream(createInputStream());
+ signal.setOutputStream(createOutputStream(-correlationID, signalID, false));
+ signals.put(-correlationID, signal);
+ executorService.execute(signal);
+ }
+ }
+ else
+ {
+ // Incoming confirmation
+ signal = signals.get(-correlationID);
+ if (signal == null)
+ {
+ System.out.println(toString() + ": Discarding buffer");
+ buffer.release();
+ }
}
signal.getInputStream().handleBuffer(buffer);
@@ -96,7 +111,7 @@ public abstract class SignalProtocol extends AbstractProtocol
short signalID = signalActor.getSignalID();
int correlationID = signalActor.getCorrelationID();
signalActor.setInputStream(createInputStream());
- signalActor.setOutputStream(createOutputStream(correlationID, signalID));
+ signalActor.setOutputStream(createOutputStream(correlationID, signalID, true));
signals.put(correlationID, signalActor);
signalActor.run();
}
@@ -128,9 +143,9 @@ public abstract class SignalProtocol extends AbstractProtocol
return new BufferInputStream();
}
- BufferOutputStream createOutputStream(int correlationID, short signalID)
+ BufferOutputStream createOutputStream(int correlationID, short signalID, boolean addSignalID)
{
- return new SignalOutputStream(correlationID, signalID);
+ return new SignalOutputStream(correlationID, signalID, addSignalID);
}
class SignalInputStream extends BufferInputStream
@@ -142,13 +157,14 @@ public abstract class SignalProtocol extends AbstractProtocol
class SignalOutputStream extends ChannelOutputStream
{
- public SignalOutputStream(final int correlationID, final short signalID)
+ public SignalOutputStream(final int correlationID, final short signalID,
+ final boolean addSignalID)
{
super(getChannel(), new BufferProvider()
{
private BufferProvider delegate = BufferUtil.getBufferProvider(getChannel());
- private boolean firstBuffer = true;
+ private boolean firstBuffer = addSignalID;
public short getBufferCapacity()
{
@@ -160,15 +176,15 @@ public abstract class SignalProtocol extends AbstractProtocol
Buffer buffer = delegate.provideBuffer();
ByteBuffer byteBuffer = buffer.startPutting(getChannel().getChannelID());
- System.out.println("Providing buffer for signal " + correlationID);
+ System.out.println("Providing buffer for correlation " + correlationID);
byteBuffer.putInt(correlationID);
if (firstBuffer)
{
System.out.println("Setting signal id " + signalID);
byteBuffer.putShort(signalID);
- firstBuffer = false;
}
+ firstBuffer = false;
return buffer;
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
index 7d5e743871..9d48ae0e5b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
@@ -10,8 +10,6 @@
**************************************************************************/
package org.eclipse.net4j.signal;
-import java.io.InputStream;
-
/**
* @author Eike Stepper
*/
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalActor.java
new file mode 100644
index 0000000000..1e764f1f17
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalActor.java
@@ -0,0 +1,53 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.transport.Channel;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * @author Eike Stepper
+ */
+abstract class StrictSignalActor<RESULT> extends SignalActor<RESULT>
+{
+ boolean inputAllowed;
+
+ boolean outputAllowed;
+
+ protected StrictSignalActor(Channel channel)
+ {
+ super(channel);
+ }
+
+ @Override
+ protected final DataInputStream getDataInputStream()
+ {
+ if (!inputAllowed)
+ {
+ throw new IllegalStateException("Input not allowed");
+ }
+
+ return super.getDataInputStream();
+ }
+
+ @Override
+ protected final DataOutputStream getDataOutputStream()
+ {
+ if (!outputAllowed)
+ {
+ throw new IllegalStateException("Output not allowed");
+ }
+
+ return super.getDataOutputStream();
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalReactor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalReactor.java
new file mode 100644
index 0000000000..04d5b9e7a5
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/StrictSignalReactor.java
@@ -0,0 +1,50 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * @author Eike Stepper
+ */
+abstract class StrictSignalReactor extends SignalReactor
+{
+ boolean inputAllowed;
+
+ boolean outputAllowed;
+
+ protected StrictSignalReactor()
+ {
+ }
+
+ @Override
+ protected final DataInputStream getDataInputStream()
+ {
+ if (!inputAllowed)
+ {
+ throw new IllegalStateException("Input not allowed");
+ }
+
+ return super.getDataInputStream();
+ }
+
+ @Override
+ protected final DataOutputStream getDataOutputStream()
+ {
+ if (!outputAllowed)
+ {
+ throw new IllegalStateException("Output not allowed");
+ }
+
+ return super.getDataOutputStream();
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/HexUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/HexUtil.java
new file mode 100644
index 0000000000..b18e8fb5c7
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/HexUtil.java
@@ -0,0 +1,45 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.util;
+
+/**
+ * @author Eike Stepper
+ */
+public final class HexUtil
+{
+ public static final char DIGITS[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b',
+ 'c', 'd', 'e', 'f', };
+
+ private HexUtil()
+ {
+ }
+
+ public static String toHex(int b)
+ {
+ assertByte(b);
+ return "" + DIGITS[b >> 4] + DIGITS[b & 0xf];
+ }
+
+ public static void appendHex(StringBuilder builder, int b)
+ {
+ assertByte(b);
+ builder.append(DIGITS[b >> 4]);
+ builder.append(DIGITS[b & 0xf]);
+ }
+
+ private static void assertByte(int b)
+ {
+ if (b < 0 || b > 255)
+ {
+ throw new IllegalArgumentException("b < 0 || b > 255");
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/ReflectUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/ReflectUtil.java
new file mode 100644
index 0000000000..9d2e064cc2
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/ReflectUtil.java
@@ -0,0 +1,185 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.util;
+
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ * @author Eike Stepper
+ */
+public final class ReflectUtil
+{
+ public static final Class<Object> ROOT_CLASS = Object.class;
+
+ public static final Class[] NO_PARAMETERS = null;
+
+ public static final Object[] NO_ARGUMENTS = null;
+
+ private static final String NL = System.getProperty("line.separator");
+
+ private static final Method hashCodeMethod = getHashCodeMethod();
+
+ private static final Map<Object, Long> ids = new WeakHashMap();
+
+ public static boolean DUMP_STATICS = false;
+
+ private static long lastID;
+
+ private ReflectUtil()
+ {
+ }
+
+ public static Integer getHashCode(Object object)
+ {
+ try
+ {
+ return (Integer)hashCodeMethod.invoke(object, NO_ARGUMENTS);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+
+ return 0;
+ }
+
+ public static Long getID(Object object)
+ {
+ Long id = ids.get(object);
+ if (id == null)
+ {
+ id = ++lastID;
+ ids.put(object, id);
+ }
+
+ return id;
+ }
+
+ public static String getSimpleName(Class<? extends Object> c)
+ {
+ String name = c.getName();
+ int lastDot = name.lastIndexOf('.');
+ if (lastDot != -1)
+ {
+ name = name.substring(lastDot + 1);
+ }
+
+ return name.replace('$', '.');
+ }
+
+ public static String getSimpleClassName(Object object)
+ {
+ return getSimpleName(object.getClass());
+ }
+
+ public static String getLabel(Object object)
+ {
+ return getSimpleClassName(object) + "@" + getID(object);
+ }
+
+ public static void dump(Object object)
+ {
+ dump(object, "");
+ }
+
+ public static void dump(Object object, String prefix)
+ {
+ dump(object, prefix, System.out);
+ }
+
+ public static void dump(Object object, String prefix, PrintStream out)
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append(prefix);
+ builder.append(getLabel(object));
+ builder.append(NL);
+ dumpSegment(object.getClass(), object, prefix + " ", builder);
+ out.print(builder.toString());
+ }
+
+ private static void dumpSegment(Class<? extends Object> segment, Object object, String prefix,
+ StringBuilder builder)
+ {
+ if (segment != ROOT_CLASS)
+ {
+ dumpSegment(segment.getSuperclass(), object, prefix, builder);
+ }
+
+ String segmentPrefix = segment == object.getClass() ? "" : getSimpleName(segment) + ".";
+ Field[] fields = segment.getDeclaredFields();
+ for (Field field : fields)
+ {
+ if (field.isSynthetic())
+ {
+ continue;
+ }
+
+ if ((field.getModifiers() & Modifier.STATIC) != 0 && !DUMP_STATICS)
+ {
+ continue;
+ }
+
+ builder.append(prefix);
+ builder.append(segmentPrefix);
+ builder.append(field.getName());
+ builder.append(" = ");
+ builder.append(getValue(object, field));
+ builder.append(NL);
+ }
+ }
+
+ public static Object getValue(Object object, Field field)
+ {
+ try
+ {
+ return field.get(object);
+ }
+ catch (IllegalAccessException ex)
+ {
+ field.setAccessible(true);
+ try
+ {
+ return field.get(object);
+ }
+ catch (IllegalAccessException ex1)
+ {
+ throw new RuntimeException(ex1);
+ }
+ }
+ }
+
+ private static Method getHashCodeMethod()
+ {
+ Method method;
+
+ try
+ {
+ method = ROOT_CLASS.getMethod("hashCode", NO_PARAMETERS);
+ }
+ catch (Exception ex)
+ {
+ // This can really not happen
+ throw new AssertionError();
+ }
+
+ if (!method.isAccessible())
+ {
+ method.setAccessible(true);
+ }
+
+ return method;
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/lifecycle/AbstractLifecycle.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/lifecycle/AbstractLifecycle.java
index 1fa2d4eb0d..3e37b306dd 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/lifecycle/AbstractLifecycle.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/lifecycle/AbstractLifecycle.java
@@ -1,5 +1,7 @@
package org.eclipse.net4j.util.lifecycle;
+import org.eclipse.net4j.util.ReflectUtil;
+
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -8,6 +10,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public abstract class AbstractLifecycle implements Lifecycle, LifecycleNotifier
{
+ public static boolean DUMP_ON_ACTIVATE = false;
+
+ public static boolean USE_LABEL = true;
+
private boolean active;
/**
@@ -37,6 +43,12 @@ public abstract class AbstractLifecycle implements Lifecycle, LifecycleNotifier
onAccessBeforeActivate();
onActivate();
active = true;
+
+ if (DUMP_ON_ACTIVATE)
+ {
+ ReflectUtil.dump(this, toString() + ": DUMP ");
+ }
+
fireLifecycleActivated();
}
}
@@ -71,6 +83,19 @@ public abstract class AbstractLifecycle implements Lifecycle, LifecycleNotifier
return active;
}
+ @Override
+ public String toString()
+ {
+ if (USE_LABEL)
+ {
+ return ReflectUtil.getLabel(this);
+ }
+ else
+ {
+ return super.toString();
+ }
+ }
+
protected void fireLifecycleActivated()
{
for (LifecycleListener listener : listeners)
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java
index 4208f91e91..eb6ef140b7 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java
@@ -12,6 +12,7 @@ package org.eclipse.net4j.util.stream;
import org.eclipse.net4j.transport.Buffer;
import org.eclipse.net4j.transport.BufferHandler;
+import org.eclipse.net4j.util.HexUtil;
import java.io.IOException;
import java.io.InputStream;
@@ -30,6 +31,8 @@ public class BufferInputStream extends InputStream implements BufferHandler
public static final long DEFAULT_MILLIS_INTERRUPT_CHECK = 100;
+ public static boolean TRACE = false;
+
private BlockingQueue<Buffer> buffers = new LinkedBlockingQueue<Buffer>();
private Buffer currentBuffer;
@@ -60,7 +63,14 @@ public class BufferInputStream extends InputStream implements BufferHandler
return -1;
}
- int result = currentBuffer.getByteBuffer().get() - Byte.MIN_VALUE;
+ final byte b = currentBuffer.getByteBuffer().get();
+ final int result = b < 0 ? ~b : b;
+ if (TRACE)
+ {
+ System.out.println("<-- " + HexUtil.toHex(result)
+ + (result >= 32 ? " " + Character.toString((char)result) : ""));
+ }
+
if (!currentBuffer.getByteBuffer().hasRemaining())
{
currentBuffer.release();
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferOutputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferOutputStream.java
index ab67ac766d..92f4342845 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferOutputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferOutputStream.java
@@ -13,6 +13,7 @@ package org.eclipse.net4j.util.stream;
import org.eclipse.net4j.transport.Buffer;
import org.eclipse.net4j.transport.BufferHandler;
import org.eclipse.net4j.transport.BufferProvider;
+import org.eclipse.net4j.util.HexUtil;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import java.io.IOException;
@@ -26,6 +27,8 @@ public class BufferOutputStream extends OutputStream
{
public static final boolean DEFAULT_PROPAGATE_CLOSE = false;
+ public static boolean TRACE = false;
+
private BufferHandler bufferHandler;
private BufferProvider bufferProvider;
@@ -61,8 +64,14 @@ public class BufferOutputStream extends OutputStream
public void write(int b) throws IOException
{
ensureBuffer();
+ if (TRACE)
+ {
+ System.out.println("--> " + HexUtil.toHex(b)
+ + (b >= 32 ? " " + Character.toString((char)b) : ""));
+ }
+
ByteBuffer buffer = currentBuffer.getByteBuffer();
- buffer.put((byte)(b + Byte.MIN_VALUE));
+ buffer.put((byte)(b > Byte.MAX_VALUE ? ~(b + Byte.MIN_VALUE) : b));
if (!buffer.hasRemaining())
{

Back to the top