summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2006-10-23 14:54:31 (EDT)
committerEike Stepper2006-10-23 14:54:31 (EDT)
commit9c8e247ed1b1b4565d29391f9c38535baff39f74 (patch)
treec65b1c76fce5f3e174e066e45b6eba91e74e2656
parent13984aa28c8f87088022a0557cc858a10c433a0a (diff)
downloadcdo-9c8e247ed1b1b4565d29391f9c38535baff39f74.zip
cdo-9c8e247ed1b1b4565d29391f9c38535baff39f74.tar.gz
cdo-9c8e247ed1b1b4565d29391f9c38535baff39f74.tar.bz2
Signal protocol
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java9
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java24
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java54
4 files changed, 45 insertions, 46 deletions
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java
index 8dcd2ce..8ff9db4 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java
@@ -146,7 +146,7 @@ public class SignalTest extends TestCase
Channel channel = connector.openChannel(TestSignalProtocol.PROTOCOL_ID);
int data = 0x0a;
- int result = new Request1(channel, data).start();
+ int result = new Request1(channel, data).send();
assertEquals(data, result);
}
@@ -167,7 +167,7 @@ public class SignalTest extends TestCase
Channel channel = connector.openChannel(TestSignalProtocol.PROTOCOL_ID);
byte[] data = TinyData.getBytes();
- byte[] result = new Request2(channel, data).start();
+ byte[] result = new Request2(channel, data).send();
assertTrue(Arrays.equals(data, result));
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
index 6b5beb4..820e97a 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
@@ -29,14 +29,19 @@ public abstract class SignalActor<RESULT> extends Signal
setCorrelationID(protocol.getNextCorrelationID());
}
- public RESULT start() throws Exception
+ public RESULT send() throws Exception
+ {
+ return send(0);
+ }
+
+ public RESULT send(long timeout) throws Exception
{
if (terminated)
{
throw new IllegalStateException("Terminated");
}
- getProtocol().startSignal(this);
+ getProtocol().startSignal(this, timeout);
terminated = true;
return result;
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index f90b68f..e093a0f 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -14,7 +14,6 @@ import org.eclipse.net4j.transport.Buffer;
import org.eclipse.net4j.transport.BufferProvider;
import org.eclipse.net4j.transport.Channel;
import org.eclipse.net4j.util.stream.BufferInputStream;
-import org.eclipse.net4j.util.stream.BufferOutputStream;
import org.eclipse.net4j.util.stream.ChannelOutputStream;
import org.eclipse.internal.net4j.transport.AbstractProtocol;
@@ -73,8 +72,8 @@ public abstract class SignalProtocol extends AbstractProtocol
signal = createSignalReactor(signalID);
signal.setProtocol(this);
signal.setCorrelationID(-correlationID);
- signal.setInputStream(createInputStream());
- signal.setOutputStream(createOutputStream(-correlationID, signalID, false));
+ signal.setInputStream(new SignalInputStream());
+ signal.setOutputStream(new SignalOutputStream(-correlationID, signalID, false));
signals.put(-correlationID, signal);
executorService.execute(signal);
}
@@ -101,7 +100,7 @@ public abstract class SignalProtocol extends AbstractProtocol
protected abstract SignalReactor createSignalReactor(short signalID);
- void startSignal(SignalActor signalActor)
+ void startSignal(SignalActor signalActor, long timeout)
{
if (signalActor.getProtocol() != this)
{
@@ -110,9 +109,10 @@ public abstract class SignalProtocol extends AbstractProtocol
short signalID = signalActor.getSignalID();
int correlationID = signalActor.getCorrelationID();
- signalActor.setInputStream(createInputStream());
- signalActor.setOutputStream(createOutputStream(correlationID, signalID, true));
+ signalActor.setInputStream(new SignalInputStream());
+ signalActor.setOutputStream(new SignalOutputStream(correlationID, signalID, true));
signals.put(correlationID, signalActor);
+
signalActor.run();
}
@@ -138,16 +138,6 @@ public abstract class SignalProtocol extends AbstractProtocol
return correlationID;
}
- BufferInputStream createInputStream()
- {
- return new BufferInputStream();
- }
-
- BufferOutputStream createOutputStream(int correlationID, short signalID, boolean addSignalID)
- {
- return new SignalOutputStream(correlationID, signalID, addSignalID);
- }
-
class SignalInputStream extends BufferInputStream
{
public SignalInputStream()
@@ -180,7 +170,7 @@ public abstract class SignalProtocol extends AbstractProtocol
byteBuffer.putInt(correlationID);
if (firstBuffer)
{
- System.out.println("Setting signal id " + signalID);
+ System.out.println(SignalProtocol.this.toString() + ": Put signal id " + signalID);
byteBuffer.putShort(signalID);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java
index eb6ef14..06131b2 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/stream/BufferInputStream.java
@@ -96,47 +96,51 @@ public class BufferInputStream extends InputStream implements BufferHandler
protected boolean ensureBuffer() throws IOException
{
+ final long check = getMillisInterruptCheck();
+ final long timeout = getMillisBeforeTimeout();
+
try
{
- final long check = getMillisInterruptCheck();
- final long timeout = getMillisBeforeTimeout();
-
- try
+ if (timeout == NO_TIMEOUT)
{
- if (timeout == NO_TIMEOUT)
+ while (currentBuffer == null)
{
- while (currentBuffer == null)
+ if (buffers == null)
{
- currentBuffer = buffers.poll(check, TimeUnit.MILLISECONDS);
+ // Stream has been closed
+ return false;
}
+
+ currentBuffer = buffers.poll(check, TimeUnit.MILLISECONDS);
}
- else
+ }
+ else
+ {
+ final long stop = System.currentTimeMillis() + timeout;
+ while (currentBuffer == null)
{
- final long stop = System.currentTimeMillis() + timeout;
- while (currentBuffer == null)
+ if (buffers == null)
{
- final long remaining = stop - System.currentTimeMillis();
- if (remaining <= 0)
- {
- return false;
- }
+ // Stream has been closed
+ return false;
+ }
- currentBuffer = buffers.poll(Math.min(remaining, check), TimeUnit.MILLISECONDS);
+ final long remaining = stop - System.currentTimeMillis();
+ if (remaining <= 0)
+ {
+ return false;
}
+
+ currentBuffer = buffers.poll(Math.min(remaining, check), TimeUnit.MILLISECONDS);
}
}
- catch (InterruptedException ex)
- {
- throw new IOException("Interrupted");
- }
-
- eos = currentBuffer.isEOS();
}
- catch (RuntimeException ex)
+ catch (InterruptedException ex)
{
- // TODO Remove
- ex.printStackTrace();
+ throw new IOException("Interrupted");
}
+
+ eos = currentBuffer.isEOS();
return true;
}