Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java16
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/ChannelTest.java153
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/ArrayRequest.java13
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadIndication.java44
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadRequest.java47
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java15
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java63
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java92
9 files changed, 419 insertions, 35 deletions
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
index 8d44405277..473422710a 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
@@ -215,7 +215,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
{
if (inputBuffer == null)
{
- inputBuffer = getConfig().getBufferProvider().provideBuffer();
+ inputBuffer = provideBuffer();
}
ByteBuffer byteBuffer = inputBuffer.startGetting(socketChannel);
@@ -416,7 +416,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
@Override
protected void doDeactivate() throws Exception
{
- cancelSelectionKey();
+ cleanUp();
LifecycleUtil.deactivate(controlChannel);
controlChannel = null;
@@ -429,7 +429,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
protected void deactivateAsync()
{
// Cancel the selection immediately
- cancelSelectionKey();
+ cleanUp();
// Do the rest of the deactivation asynchronously
getConfig().getReceiveExecutor().execute(new Runnable()
@@ -442,8 +442,14 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
});
}
- private void cancelSelectionKey()
+ private void cleanUp()
{
+ if (inputBuffer != null)
+ {
+ inputBuffer.release();
+ inputBuffer = null;
+ }
+
if (selectionKey != null)
{
selectionKey.cancel();
@@ -479,7 +485,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
@Override
public ByteBuffer getBuffer()
{
- buffer = getConfig().getBufferProvider().provideBuffer();
+ buffer = provideBuffer();
ByteBuffer byteBuffer = buffer.startPutting(ControlChannel.CONTROL_CHANNEL_INDEX);
byteBuffer.put(ControlChannel.OPCODE_NEGOTIATION);
return byteBuffer;
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/ChannelTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/ChannelTest.java
index 427e44e345..972a613fd8 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/ChannelTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/ChannelTest.java
@@ -14,28 +14,38 @@ package org.eclipse.net4j.tests;
import static org.junit.Assert.assertArrayEquals;
import org.eclipse.net4j.ITransportConfigAware;
+import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.IConnector;
+import org.eclipse.net4j.signal.SignalFinishedEvent;
+import org.eclipse.net4j.signal.SignalProtocol.InvalidSignalIDException;
import org.eclipse.net4j.tests.config.AbstractConfigTest;
import org.eclipse.net4j.tests.data.HugeData;
import org.eclipse.net4j.tests.data.TinyData;
import org.eclipse.net4j.tests.signal.ArrayRequest;
+import org.eclipse.net4j.tests.signal.PartialReadRequest;
import org.eclipse.net4j.tests.signal.TestSignalProtocol;
import org.eclipse.net4j.util.concurrent.MonitoredThread;
import org.eclipse.net4j.util.concurrent.MonitoredThread.MultiThreadMonitor;
+import org.eclipse.net4j.util.factory.ProductCreationException;
import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
import org.eclipse.net4j.util.lifecycle.LifecycleEventAdapter;
+import org.eclipse.internal.net4j.buffer.BufferPoolFactory;
+
import org.eclipse.spi.net4j.InternalChannel;
import org.eclipse.spi.net4j.InternalConnector;
+import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
/**
* @author Eike Stepper
@@ -301,6 +311,142 @@ public class ChannelTest extends AbstractConfigTest
enableConsole();
}
+ /**
+ * This test case tests the following situation:
+ * <p>
+ * A client sends a signal
+ */
+ public void testStreamFitsInOneBuffer() throws Exception
+ {
+ CountDownLatch signalFinished = new CountDownLatch(1);
+ AtomicReference<BufferUnderflowException> exception = new AtomicReference<>();
+
+ acceptorContainer.registerFactory(new TestSignalProtocol.Factory()
+ {
+ @Override
+ public TestSignalProtocol create(String description) throws ProductCreationException
+ {
+ TestSignalProtocol protocol = new TestSignalProtocol()
+ {
+ private int receivedBuffers;
+
+ @Override
+ public void handleBuffer(IBuffer buffer)
+ {
+ System.out.println("Received buffer " + receivedBuffers + " --> eos=" + buffer.isEOS());
+
+ // After the first buffer wait until the signal has finished.
+ if (receivedBuffers++ == 1)
+ {
+ await(signalFinished);
+ }
+
+ try
+ {
+ super.handleBuffer(buffer);
+ }
+ catch (BufferUnderflowException ex)
+ {
+ ex.printStackTrace();
+ exception.set(ex);
+ }
+ }
+ };
+
+ protocol.setVersion(version);
+ return protocol;
+ }
+ });
+
+ TestSignalProtocol protocol = openTestSignalProtocol();
+
+ InternalConnector serverConnector = (InternalConnector)getAcceptor().getAcceptedConnectors()[0];
+ IChannel serverChannel = serverConnector.getChannels().iterator().next();
+ TestSignalProtocol serverProtocol = (TestSignalProtocol)serverChannel.getReceiveHandler();
+ serverProtocol.addListener(event -> {
+ if (event instanceof SignalFinishedEvent)
+ {
+ signalFinished.countDown();
+ }
+ });
+
+ byte[] hugeData = HugeData.getBytes();
+
+ int headerSize = IBuffer.HEADER_SIZE;
+ headerSize += 4; // Correlation ID.
+ headerSize += 2; // Signal ID.
+ headerSize += 4; // Array length.
+
+ byte[] data = new byte[BufferPoolFactory.BUFFER_CAPACITY - headerSize];
+ System.arraycopy(hugeData, 0, data, 0, data.length);
+
+ new ArrayRequest(protocol, data, true).send(); // <-- Flush without EOS!
+ assertSame(null, exception.get());
+ }
+
+ /**
+ * This test case tests the following situation:
+ * <p>
+ * A client sends a signal
+ */
+ public void testPartialRead() throws Exception
+ {
+ CountDownLatch signalFinished = new CountDownLatch(1);
+ AtomicReference<InvalidSignalIDException> exception = new AtomicReference<>();
+
+ acceptorContainer.registerFactory(new TestSignalProtocol.Factory()
+ {
+ @Override
+ public TestSignalProtocol create(String description) throws ProductCreationException
+ {
+ TestSignalProtocol protocol = new TestSignalProtocol()
+ {
+ private int receivedBuffers;
+
+ @Override
+ public void handleBuffer(IBuffer buffer)
+ {
+ System.out.println("Received buffer " + receivedBuffers + " --> eos=" + buffer.isEOS());
+
+ // After the first buffer wait until the signal has finished.
+ if (receivedBuffers++ == 1)
+ {
+ await(signalFinished);
+ }
+
+ try
+ {
+ super.handleBuffer(buffer);
+ }
+ catch (InvalidSignalIDException ex)
+ {
+ ex.printStackTrace();
+ exception.set(ex);
+ }
+ }
+ };
+
+ protocol.setVersion(version);
+ return protocol;
+ }
+ });
+
+ TestSignalProtocol protocol = openTestSignalProtocol();
+
+ InternalConnector serverConnector = (InternalConnector)getAcceptor().getAcceptedConnectors()[0];
+ IChannel serverChannel = serverConnector.getChannels().iterator().next();
+ TestSignalProtocol serverProtocol = (TestSignalProtocol)serverChannel.getReceiveHandler();
+ serverProtocol.addListener(event -> {
+ if (event instanceof SignalFinishedEvent)
+ {
+ signalFinished.countDown();
+ }
+ });
+
+ new PartialReadRequest(protocol).send();
+ assertSame(null, exception.get());
+ }
+
@Override
protected void doSetUp() throws Exception
{
@@ -352,7 +498,12 @@ public class ChannelTest extends AbstractConfigTest
{
synchronized (protocols)
{
- protocol.getChannel().removeListener(this);
+ IChannel channel = protocol.getChannel();
+ if (channel != null)
+ {
+ channel.removeListener(this);
+ }
+
boolean removed = protocols.remove(protocol);
assertEquals(true, removed);
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/ArrayRequest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/ArrayRequest.java
index 8b357567e8..ca5e366542 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/ArrayRequest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/ArrayRequest.java
@@ -22,16 +22,29 @@ public class ArrayRequest extends RequestWithConfirmation<byte[]>
{
private byte[] data;
+ private boolean flush;
+
public ArrayRequest(SignalProtocol<?> protocol, byte[] data)
{
+ this(protocol, data, false);
+ }
+
+ public ArrayRequest(SignalProtocol<?> protocol, byte[] data, boolean flush)
+ {
super(protocol, TestSignalProtocol.SIGNAL_ARRAY);
this.data = data;
+ this.flush = flush;
}
@Override
protected void requesting(ExtendedDataOutputStream out) throws Exception
{
out.writeByteArray(data);
+
+ if (flush)
+ {
+ out.flush();
+ }
}
@Override
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadIndication.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadIndication.java
new file mode 100644
index 0000000000..aa4b369127
--- /dev/null
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadIndication.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2007, 2008, 2011, 2012, 2015 Eike Stepper (Loehne, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.tests.signal;
+
+import org.eclipse.net4j.signal.IndicationWithResponse;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+
+/**
+ * @author Eike Stepper
+ */
+public class PartialReadIndication extends IndicationWithResponse
+{
+ public PartialReadIndication(TestSignalProtocol protocol)
+ {
+ super(protocol, TestSignalProtocol.SIGNAL_INT);
+ }
+
+ @Override
+ protected void indicating(ExtendedDataInputStream in) throws Exception
+ {
+ int data = in.readInt();
+ if (data != PartialReadRequest.DATA)
+ {
+ throw new Error("Read " + data + " instead of " + PartialReadRequest.DATA);
+ }
+
+ // Do not read the remaining 2 bytes here that PartialReadRequest has sent!
+ }
+
+ @Override
+ protected void responding(ExtendedDataOutputStream out) throws Exception
+ {
+ out.writeBoolean(true);
+ }
+}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadRequest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadRequest.java
new file mode 100644
index 0000000000..3d125b2066
--- /dev/null
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/PartialReadRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2007, 2008, 2011, 2012, 2015 Eike Stepper (Loehne, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.tests.signal;
+
+import org.eclipse.net4j.signal.RequestWithConfirmation;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+
+/**
+ * @author Eike Stepper
+ */
+public class PartialReadRequest extends RequestWithConfirmation<Boolean>
+{
+ public static final int DATA = 4711;
+
+ public PartialReadRequest(TestSignalProtocol protocol)
+ {
+ super(protocol, TestSignalProtocol.SIGNAL_PARTIAL_READ);
+ }
+
+ @Override
+ protected void requesting(ExtendedDataOutputStream out) throws Exception
+ {
+ // This is the only data PartialReadIndication will read.
+ out.writeInt(DATA);
+
+ // This will cause the current buffer to be sent.
+ out.flush();
+
+ // This will be interpreted as the signalID of a new incoming indication by SignalProtocol.handleBuffer().
+ out.writeShort(Short.MAX_VALUE);
+ }
+
+ @Override
+ protected Boolean confirming(ExtendedDataInputStream in) throws Exception
+ {
+ return in.readBoolean();
+ }
+}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java
index 70023ab528..65f25c2dab 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java
@@ -40,6 +40,8 @@ public class TestSignalProtocol extends SignalProtocol<Object>
public static final short SIGNAL_EXCEPTION = 6;
+ public static final short SIGNAL_PARTIAL_READ = 7;
+
public static final String SIMULATED_EXCEPTION = "Simulated exception"; //$NON-NLS-1$
private int version = super.getVersion();
@@ -96,6 +98,9 @@ public class TestSignalProtocol extends SignalProtocol<Object>
case SIGNAL_EXCEPTION:
return new ExceptionIndication(this);
+ case SIGNAL_PARTIAL_READ:
+ return new PartialReadIndication(this);
+
default:
return super.createSignalReactor(signalID);
}
@@ -128,17 +133,17 @@ public class TestSignalProtocol extends SignalProtocol<Object>
*/
public static class Factory extends ServerProtocolFactory
{
- private int version = IProtocol2.UNSPECIFIED_VERSION;
+ protected final int version;
- public Factory(int version)
+ public Factory()
{
- this();
- this.version = version;
+ this(IProtocol2.UNSPECIFIED_VERSION);
}
- public Factory()
+ public Factory(int version)
{
super(PROTOCOL_NAME);
+ this.version = version;
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
index 0243bfd26b..115e6f9efd 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
@@ -176,15 +176,30 @@ public class Buffer implements InternalBuffer
@Override
public void release()
{
- if (state != BufferState.RELEASED)
+ try
{
- state = BufferState.RELEASED;
- errorHandler = null;
- if (bufferProvider != null)
+ if (state != BufferState.RELEASED)
{
- bufferProvider.retainBuffer(this);
+ state = BufferState.RELEASED;
+ errorHandler = null;
+
+ if (bufferProvider != null)
+ {
+ bufferProvider.retainBuffer(this);
+ }
}
}
+ catch (Error ex)
+ {
+ // Don't swallow errors.
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ // A normal exception can only occur if the buffer provider implementation is buggy.
+ // Do not simply continue in this case because it's not known in which state this buffer is then.
+ throw new Error("Implementation error in buffer provider " + bufferProvider, ex);
+ }
}
@Override
@@ -663,20 +678,52 @@ public class Buffer implements InternalBuffer
public static String dump(ByteBuffer byteBuffer)
{
+ return dump(byteBuffer, false);
+ }
+
+ public static String dump(ByteBuffer byteBuffer, boolean fullCapacity)
+ {
final int position = byteBuffer.position();
final int limit = byteBuffer.limit();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < limit; i++)
{
- int b = byteBuffer.get(i) & 0xFF; // Dump unsigned int instead of signed byte.
- builder.append('\u00A0'); // NO-BREAK SPACE. A bug in JDT's detail formatter hates normal spaces.
- builder.append(b);
+ dumpByte(byteBuffer, builder, i);
+ }
+
+ if (fullCapacity)
+ {
+ int capacity = byteBuffer.capacity();
+ if (limit < capacity)
+ {
+ byteBuffer.limit(capacity);
+
+ try
+ {
+ builder.append("\n--");
+ for (int i = limit; i < capacity; i++)
+ {
+ dumpByte(byteBuffer, builder, i);
+ }
+ }
+ finally
+ {
+ byteBuffer.limit(limit);
+ }
+ }
}
return "pos\u00A0" + position + "/" + limit + ":" + builder;
}
+ private static void dumpByte(ByteBuffer byteBuffer, StringBuilder builder, int i)
+ {
+ int b = byteBuffer.get(i) & 0xFF; // Dump unsigned int instead of signed byte.
+ builder.append('\u00A0'); // NO-BREAK SPACE. A bug in JDT's detail formatter hates normal spaces.
+ builder.append(b);
+ }
+
public static void main(String[] args) throws Exception
{
decodeBuffer("0001ffea0000026b001a0000000101ff000000000000058d00");
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
index dd18e7aead..6d5827df71 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
@@ -115,6 +115,10 @@ public class BufferInputStream extends InputStream implements IBufferHandler
{
buffers.add(buffer);
}
+ else
+ {
+ buffer.release();
+ }
}
@Override
@@ -253,8 +257,13 @@ public class BufferInputStream extends InputStream implements IBufferHandler
@Override
public void close() throws IOException
{
+ if (currentBuffer != null)
+ {
+ currentBuffer.release();
+ currentBuffer = null;
+ }
+
buffers = null;
- currentBuffer = null;
super.close();
if (ccam)
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index 3b12f309d6..394ec29a67 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -85,9 +85,18 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
*/
public static final short SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS = -5;
- private static final int MIN_CORRELATION_ID = 1;
+ /**
+ * Begin Of Signal.
+ */
+ private static final int BOS_BIT = 1;
+
+ private static final int BOS_MASK = ~BOS_BIT;
- private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE;
+ private static final int INC_CORRELATION_ID = 1 << BOS_BIT; // 2
+
+ private static final int MIN_CORRELATION_ID = INC_CORRELATION_ID;
+
+ private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE & BOS_MASK; // 2147483646
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, SignalProtocol.class);
@@ -233,12 +242,27 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
{
ByteBuffer byteBuffer = buffer.getByteBuffer();
int correlationID = byteBuffer.getInt();
- if (TRACER.isEnabled())
+
+ boolean beginOfSignal = false;
+ if ((correlationID & BOS_BIT) != 0)
{
- TRACER.trace("Received buffer for correlation " + correlationID); //$NON-NLS-1$
+ correlationID &= BOS_MASK;
+ beginOfSignal = true;
+
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Received first buffer for correlation " + correlationID); //$NON-NLS-1$
+ }
+ }
+ else
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Received buffer for correlation " + correlationID); //$NON-NLS-1$
+ }
}
- Signal signal;
+ Signal signal = null;
boolean newSignalScheduled = false;
synchronized (signals)
@@ -246,8 +270,7 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
if (correlationID > 0)
{
// Incoming indication
- signal = signals.get(-correlationID);
- if (signal == null)
+ if (beginOfSignal)
{
short signalID = byteBuffer.getShort();
if (TRACER.isEnabled())
@@ -270,6 +293,10 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
newSignalScheduled = true;
}
}
+ else
+ {
+ signal = signals.get(-correlationID);
+ }
}
else
{
@@ -388,7 +415,7 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
SignalReactor signal = createSignalReactor(signalID);
if (signal == null)
{
- throw new IllegalArgumentException("Invalid signalID " + signalID); //$NON-NLS-1$
+ throw new InvalidSignalIDException(signalID);
}
return signal;
@@ -436,7 +463,7 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
}
else
{
- ++nextCorrelationID;
+ nextCorrelationID += INC_CORRELATION_ID;
}
return correlationID;
@@ -481,8 +508,10 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
void startSignal(SignalActor signalActor, long timeout) throws Exception
{
checkArg(signalActor.getProtocol() == this, "Wrong protocol"); //$NON-NLS-1$
+
short signalID = signalActor.getID();
int correlationID = signalActor.getCorrelationID();
+
signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true));
if (signalActor instanceof RequestWithConfirmation<?>)
{
@@ -622,6 +651,28 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
}
/**
+ * @author Eike Stepper
+ * @since 4.10
+ */
+ public static final class InvalidSignalIDException extends IllegalArgumentException
+ {
+ private static final long serialVersionUID = 1L;
+
+ private final short signalID;
+
+ public InvalidSignalIDException(short signalID)
+ {
+ super("Invalid signal ID " + signalID);
+ this.signalID = signalID;
+ }
+
+ public short getSignalID()
+ {
+ return signalID;
+ }
+ }
+
+ /**
* An {@link IEvent event} fired from a {@link ISignalProtocol signal protocol} when the protocol {@link ISignalProtocol#setTimeout(long) timeout}
* has been changed.
*
@@ -698,13 +749,15 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
*/
class SignalOutputStream extends ChannelOutputStream
{
- public SignalOutputStream(final int correlationID, final short signalID, final boolean addSignalID)
+ public SignalOutputStream(int correlationID, short signalID, boolean request)
{
super(getChannel(), new IBufferProvider()
{
private IBufferProvider delegate = getBufferProvider();
- private boolean firstBuffer = addSignalID;
+ private boolean beginOfSignal = true;
+
+ private boolean addSignalID = request;
@Override
public short getBufferCapacity()
@@ -723,14 +776,23 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
IBuffer buffer = delegate.provideBuffer();
ByteBuffer byteBuffer = buffer.startPutting(channel.getID());
- byteBuffer.putInt(correlationID);
- if (firstBuffer)
+ if (beginOfSignal)
+ {
+ byteBuffer.putInt(correlationID | BOS_BIT);
+ beginOfSignal = false;
+
+ if (addSignalID)
+ {
+ byteBuffer.putShort(signalID);
+ addSignalID = false;
+ }
+ }
+ else
{
- byteBuffer.putShort(signalID);
+ byteBuffer.putInt(correlationID);
}
- firstBuffer = false;
return buffer;
}

Back to the top