Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2006-10-22 16:28:22 -0400
committerEike Stepper2006-10-22 16:28:22 -0400
commit697d6277d2715b06e41abc790ab8b053df1520c7 (patch)
tree42cf443c1076c70add0e3e10a4658097f4bff574 /plugins/org.eclipse.net4j
parentd2b31d4e130e492fe6a7581612a85d35659ef4b3 (diff)
downloadcdo-697d6277d2715b06e41abc790ab8b053df1520c7.tar.gz
cdo-697d6277d2715b06e41abc790ab8b053df1520c7.tar.xz
cdo-697d6277d2715b06e41abc790ab8b053df1520c7.zip
Signal protocol
Diffstat (limited to 'plugins/org.eclipse.net4j')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java15
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java20
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java109
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageDeserializer.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Serializer.java)4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageSerializer.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Deserializer.java)4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java39
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java93
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java71
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java132
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java)8
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java127
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Asynchronizer.java)15
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Worker.java)10
14 files changed, 487 insertions, 164 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
index f1dc579db4..cbf6991284 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
@@ -57,7 +57,7 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con
/**
* TODO synchronize on channels?
*/
- private List<ChannelImpl> channels = new ArrayList();
+ private List<ChannelImpl> channels = new ArrayList(0);
private State state = State.DISCONNECTED;
@@ -336,14 +336,8 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con
{
ChannelImpl channel = new ChannelImpl(receiveExecutor);
Protocol protocol = createProtocol(protocolID, channel);
- if (protocol == null)
- {
- System.out.println(toString() + ": Opening channel without protocol");
- }
- else
- {
- System.out.println(toString() + ": Opening channel with protocol " + protocolID);
- }
+ System.out.println(toString() + ": Opening channel " + channelID
+ + (protocol == null ? " without protocol" : " with protocol " + protocolID));
channel.setChannelID(channelID);
channel.setConnector(this);
@@ -379,7 +373,7 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con
{
for (final ChannelImpl channel : channels)
{
- if (channel != NULL_CHANNEL)
+ if (channel != NULL_CHANNEL && channel.isActive())
{
Queue<Buffer> bufferQueue = channel.getSendQueue();
result.add(bufferQueue);
@@ -403,7 +397,6 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con
}
}
- channels.add(NULL_CHANNEL);
return (short)size;
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java
index ccd55fe90a..b3f3ae40a9 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java
@@ -18,6 +18,10 @@ import org.eclipse.net4j.transport.Connector.Type;
*/
public abstract class AbstractProtocolFactory implements ProtocolFactory
{
+ public AbstractProtocolFactory()
+ {
+ }
+
public final boolean isForClients()
{
return getConnectorTypes().contains(Type.CLIENT);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
index aa3e2ed908..1f04a58c39 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
@@ -182,16 +182,26 @@ public class BufferImpl implements Buffer
public ByteBuffer startPutting(short channelID)
{
- if (state != State.INITIAL)
+ if (state == State.PUTTING)
+ {
+ if (channelID != this.channelID)
+ {
+ throw new IllegalArgumentException("channelID != this.channelID");
+ }
+ }
+ else if (state != State.INITIAL)
{
throw new IllegalStateException("state == " + state);
}
+ else
+ {
+ state = State.PUTTING;
+ this.channelID = channelID;
- state = State.PUTTING;
- this.channelID = channelID;
+ byteBuffer.clear();
+ byteBuffer.position(BufferImpl.HEADER_SIZE);
+ }
- byteBuffer.clear();
- byteBuffer.position(BufferImpl.HEADER_SIZE);
return byteBuffer;
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
index f4cce359d2..6aaacc8f80 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
@@ -15,6 +15,9 @@ import org.eclipse.net4j.transport.BufferHandler;
import org.eclipse.net4j.transport.BufferProvider;
import org.eclipse.net4j.transport.Channel;
import org.eclipse.net4j.transport.Connector;
+import org.eclipse.net4j.util.concurrent.AsynchronousWorkSerializer;
+import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
+import org.eclipse.net4j.util.concurrent.WorkSerializer;
import org.eclipse.net4j.util.lifecycle.AbstractLifecycle;
import org.eclipse.internal.net4j.transport.BufferImpl.State;
@@ -26,7 +29,7 @@ import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public class ChannelImpl extends AbstractLifecycle implements Channel, BufferProvider, Runnable
+public class ChannelImpl extends AbstractLifecycle implements Channel, BufferProvider
{
private short channelID = BufferImpl.NO_CHANNEL;
@@ -38,22 +41,11 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
*/
private BufferHandler receiveHandler;
- /**
- * An optional executor that is used to process the {@link #receiveQueue}
- * instead of the current thread. If not <code>null</code> the sender and
- * the receiver peers become decoupled.
- * <p>
- */
private ExecutorService receiveExecutor;
- private Occupation receiveExecutorOccupation = new Occupation();
-
- /**
- * TODO Optimize for embedded transport
- */
- private Queue<Buffer> receiveQueue = new ConcurrentLinkedQueue();
+ private WorkSerializer receiveSerializer;
- private Queue<Buffer> sendQueue = new ConcurrentLinkedQueue();
+ private Queue<Buffer> sendQueue;
public ChannelImpl(ExecutorService receiveExecutor)
{
@@ -105,16 +97,6 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
return sendQueue;
}
- public ExecutorService getReceiveExecutor()
- {
- return receiveExecutor;
- }
-
- public Queue<Buffer> getReceiveQueue()
- {
- return receiveQueue;
- }
-
public BufferHandler getReceiveHandler()
{
return receiveHandler;
@@ -125,6 +107,11 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
this.receiveHandler = receiveHandler;
}
+ public ExecutorService getReceiveExecutor()
+ {
+ return receiveExecutor;
+ }
+
public void close()
{
deactivate();
@@ -148,7 +135,7 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
connector.multiplexBuffer(this);
}
- public void handleBufferFromMultiplexer(Buffer buffer)
+ public void handleBufferFromMultiplexer(final Buffer buffer)
{
if (receiveHandler == null)
{
@@ -157,46 +144,13 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
return;
}
- if (receiveExecutor == null)
- {
- // Bypass the receiveQueue
- receiveHandler.handleBuffer(buffer);
- return;
- }
-
- receiveQueue.add(buffer);
-
- // isOccupied can (and must) be called unsynchronized here
- if (receiveExecutorOccupation.isOccupied())
- {
- return;
- }
-
- synchronized (receiveExecutorOccupation)
+ receiveSerializer.addWork(new Runnable()
{
- receiveExecutorOccupation.setOccupied(true);
- }
-
- System.out.println(toString() + ": Spawning new receive executor");
- receiveExecutor.execute(this);
- }
-
- /**
- * Executed in the context of the {@link #receiveExecutor}.
- * <p>
- */
- public void run()
- {
- synchronized (receiveExecutorOccupation)
- {
- Buffer buffer;
- while ((buffer = receiveQueue.poll()) != null)
+ public void run()
{
receiveHandler.handleBuffer(buffer);
}
-
- receiveExecutorOccupation.setOccupied(false);
- }
+ });
}
@Override
@@ -221,27 +175,26 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
}
@Override
- protected void onDeactivate() throws Exception
+ protected void onActivate() throws Exception
{
- sendQueue.clear();
- super.onDeactivate();
- }
-
- /**
- * @author Eike Stepper
- */
- private static class Occupation
- {
- private boolean occupied = false;
-
- public boolean isOccupied()
+ super.onActivate();
+ sendQueue = new ConcurrentLinkedQueue();
+ if (receiveExecutor == null)
{
- return occupied;
+ receiveSerializer = new SynchronousWorkSerializer();
}
-
- public void setOccupied(boolean occupied)
+ else
{
- this.occupied = occupied;
+ receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor);
}
}
+
+ @Override
+ protected void onDeactivate() throws Exception
+ {
+ receiveSerializer = null;
+ sendQueue.clear();
+ sendQueue = null;
+ super.onDeactivate();
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Serializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageDeserializer.java
index fb6e4f7594..30b5dbc593 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Serializer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageDeserializer.java
@@ -11,12 +11,12 @@
package org.eclipse.net4j.message;
/**
- * TODO The {@link Serializer} class.
+ * TODO The {@link MessageDeserializer} class.
* <p>
*
* @author Eike Stepper
*/
-public interface Serializer
+public interface MessageDeserializer
{
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Deserializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageSerializer.java
index b1a50a5d9c..95dc915f75 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Deserializer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageSerializer.java
@@ -11,12 +11,12 @@
package org.eclipse.net4j.message;
/**
- * TODO The {@link Deserializer} class.
+ * TODO The {@link MessageSerializer} class.
* <p>
*
* @author Eike Stepper
*/
-public interface Deserializer
+public interface MessageSerializer
{
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
deleted file mode 100644
index f9f75314c2..0000000000
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***************************************************************************
- * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- **************************************************************************/
-package org.eclipse.net4j.signal;
-
-import java.io.OutputStream;
-
-/**
- * @author Eike Stepper
- */
-public abstract class Request extends Signal
-{
- protected Request(SignalProtocol protocol)
- {
- setProtocol(protocol);
- setCorrelationID(protocol.getNextCorrelationID());
- }
-
- public Object send() throws Exception
- {
- return getProtocol().sendRequest(this);
- }
-
- @Override
- public String toString()
- {
- return "Request[" + getSignalID() + ", " + getProtocol() + ", correlation="
- + getCorrelationID() + "]";
- }
-
- protected abstract void requesting(OutputStream stream) throws Exception;
-}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
index 9fffa816ed..7bc430518e 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
@@ -10,29 +10,106 @@
**************************************************************************/
package org.eclipse.net4j.signal;
+import org.eclipse.net4j.util.stream.BufferInputStream;
+import org.eclipse.net4j.util.stream.BufferOutputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
/**
* @author Eike Stepper
*/
-public abstract class Signal
+public abstract class Signal implements Runnable
{
private SignalProtocol protocol;
private int correlationID;
+ private BufferInputStream inputStream;
+
+ private BufferOutputStream outputStream;
+
+ private DataInputStream dataInputStream;
+
+ private DataOutputStream dataOutputStream;
+
protected Signal()
{
}
- protected SignalProtocol getProtocol()
+ protected final SignalProtocol getProtocol()
{
return protocol;
}
- protected int getCorrelationID()
+ protected final int getCorrelationID()
{
return correlationID;
}
+ protected final BufferInputStream getInputStream()
+ {
+ return inputStream;
+ }
+
+ protected final BufferOutputStream getOutputStream()
+ {
+ return outputStream;
+ }
+
+ protected DataInputStream getDataInputStream()
+ {
+ if (dataInputStream == null)
+ {
+ dataInputStream = new DataInputStream(inputStream);
+ }
+
+ return dataInputStream;
+ }
+
+ protected DataOutputStream getDataOutputStream()
+ {
+ if (dataOutputStream == null)
+ {
+ dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ return dataOutputStream;
+ }
+
+ protected void writeByteArray(byte[] bytes) throws IOException
+ {
+ getDataOutputStream().writeInt(bytes.length);
+ getDataOutputStream().write(bytes);
+ }
+
+ protected byte[] readByteArray() throws IOException
+ {
+ int length = getDataInputStream().readInt();
+ byte[] bytes = new byte[length];
+ getDataInputStream().read(bytes);
+ return bytes;
+ }
+
+ public final void run()
+ {
+ try
+ {
+ execute(inputStream, outputStream);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ finally
+ {
+ getProtocol().stopSignal(this);
+ }
+ }
+
+ protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception;
+
protected abstract short getSignalID();
void setProtocol(SignalProtocol protocol)
@@ -44,4 +121,14 @@ public abstract class Signal
{
this.correlationID = correlationID;
}
+
+ void setInputStream(BufferInputStream inputStream)
+ {
+ this.inputStream = inputStream;
+ }
+
+ void setOutputStream(BufferOutputStream outputStream)
+ {
+ this.outputStream = outputStream;
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
new file mode 100644
index 0000000000..6b5beb4a28
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
@@ -0,0 +1,71 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.transport.BufferHandler;
+import org.eclipse.net4j.transport.Channel;
+
+/**
+ * @author Eike Stepper
+ */
+public abstract class SignalActor<RESULT> extends Signal
+{
+ private boolean terminated;
+
+ private RESULT result;
+
+ protected SignalActor(Channel channel)
+ {
+ SignalProtocol protocol = extractSignalProtocol(channel);
+ setProtocol(protocol);
+ setCorrelationID(protocol.getNextCorrelationID());
+ }
+
+ public RESULT start() throws Exception
+ {
+ if (terminated)
+ {
+ throw new IllegalStateException("Terminated");
+ }
+
+ getProtocol().startSignal(this);
+ terminated = true;
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SignalActor[" + getSignalID() + ", " + getProtocol() + ", correlation="
+ + getCorrelationID() + (terminated ? ", SENT" : "") + "]";
+ }
+
+ protected void setResult(RESULT result)
+ {
+ this.result = result;
+ }
+
+ private static SignalProtocol extractSignalProtocol(Channel channel)
+ {
+ BufferHandler receiveHandler = channel.getReceiveHandler();
+ if (receiveHandler == null)
+ {
+ throw new IllegalArgumentException("Channel has no protocol");
+ }
+
+ if (!(receiveHandler instanceof SignalProtocol))
+ {
+ throw new IllegalArgumentException("Channel has no signal protocol");
+ }
+
+ return (SignalProtocol)receiveHandler;
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index b65bd9cd18..2706d6c454 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -11,31 +11,71 @@
package org.eclipse.net4j.signal;
import org.eclipse.net4j.transport.Buffer;
+import org.eclipse.net4j.transport.BufferProvider;
import org.eclipse.net4j.transport.Channel;
+import org.eclipse.net4j.util.stream.BufferInputStream;
+import org.eclipse.net4j.util.stream.BufferOutputStream;
+import org.eclipse.net4j.util.stream.ChannelOutputStream;
import org.eclipse.internal.net4j.transport.AbstractProtocol;
+import org.eclipse.internal.net4j.transport.BufferUtil;
+import org.eclipse.internal.net4j.transport.ChannelImpl;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
public abstract class SignalProtocol extends AbstractProtocol
{
- private int nextCorrelationID;
+ private ExecutorService executorService;
- private Queue<Request> requestQueue = new ConcurrentLinkedQueue();
+ private Map<Integer, Signal> signals = new ConcurrentHashMap();
- public SignalProtocol(Channel channel)
+ private int nextCorrelationID = 0;
+
+ protected SignalProtocol(Channel channel, ExecutorService executorService)
{
super(channel);
+
+ if (executorService == null)
+ {
+ throw new IllegalArgumentException("executorService == null");
+ }
+
+ this.executorService = executorService;
+ }
+
+ protected SignalProtocol(Channel channel)
+ {
+ this(channel, ((ChannelImpl)channel).getReceiveExecutor());
}
public void handleBuffer(Buffer buffer)
{
- // TODO Implement method SignalProtocol
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ int correlationID = byteBuffer.getInt();
+ System.out.println("Received buffer for signal " + correlationID);
+
+ Signal signal = signals.get(correlationID);
+ if (signal == null)
+ {
+ short signalID = byteBuffer.getShort();
+ System.out.println("Got signal id " + signalID);
+ signal = createSignalReactor(signalID);
+ signal.setProtocol(this);
+ signal.setCorrelationID(correlationID);
+ signal.setInputStream(createInputStream());
+ signal.setOutputStream(createOutputStream(correlationID, signalID));
+ signals.put(correlationID, signal);
+ executorService.execute(signal);
+ }
+
+ signal.getInputStream().handleBuffer(buffer);
}
@Override
@@ -44,12 +84,27 @@ public abstract class SignalProtocol extends AbstractProtocol
return "SignalProtocol[" + getProtocolID() + ", " + getChannel() + "]";
}
- protected abstract Indication createIndication(short signalID);
+ protected abstract SignalReactor createSignalReactor(short signalID);
+
+ void startSignal(SignalActor signalActor)
+ {
+ if (signalActor.getProtocol() != this)
+ {
+ throw new IllegalArgumentException("signalActor.getProtocol() != this");
+ }
+
+ short signalID = signalActor.getSignalID();
+ int correlationID = signalActor.getCorrelationID();
+ signalActor.setInputStream(createInputStream());
+ signalActor.setOutputStream(createOutputStream(correlationID, signalID));
+ signals.put(correlationID, signalActor);
+ signalActor.run();
+ }
- Object sendRequest(Request request)
+ void stopSignal(Signal signal)
{
- // TODO Implement method SignalProtocol
- return null;
+ int correlationID = signal.getCorrelationID();
+ signals.remove(correlationID);
}
int getNextCorrelationID()
@@ -67,4 +122,61 @@ public abstract class SignalProtocol extends AbstractProtocol
return correlationID;
}
+
+ BufferInputStream createInputStream()
+ {
+ return new BufferInputStream();
+ }
+
+ BufferOutputStream createOutputStream(int correlationID, short signalID)
+ {
+ return new SignalOutputStream(correlationID, signalID);
+ }
+
+ class SignalInputStream extends BufferInputStream
+ {
+ public SignalInputStream()
+ {
+ }
+ }
+
+ class SignalOutputStream extends ChannelOutputStream
+ {
+ public SignalOutputStream(final int correlationID, final short signalID)
+ {
+ super(getChannel(), new BufferProvider()
+ {
+ private BufferProvider delegate = BufferUtil.getBufferProvider(getChannel());
+
+ private boolean firstBuffer = true;
+
+ public short getBufferCapacity()
+ {
+ return delegate.getBufferCapacity();
+ }
+
+ public Buffer provideBuffer()
+ {
+ Buffer buffer = delegate.provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(getChannel().getChannelID());
+
+ System.out.println("Providing buffer for signal " + correlationID);
+ byteBuffer.putInt(correlationID);
+ if (firstBuffer)
+ {
+ System.out.println("Setting signal id " + signalID);
+ byteBuffer.putShort(signalID);
+ firstBuffer = false;
+ }
+
+ return buffer;
+ }
+
+ public void retainBuffer(Buffer buffer)
+ {
+ delegate.retainBuffer(buffer);
+ }
+ });
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
index a91fefe844..7d5e743871 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
@@ -15,18 +15,16 @@ import java.io.InputStream;
/**
* @author Eike Stepper
*/
-public abstract class Indication extends Signal
+public abstract class SignalReactor extends Signal
{
- protected Indication()
+ protected SignalReactor()
{
}
@Override
public String toString()
{
- return "Indication[" + getSignalID() + ", " + getProtocol() + ", correlation="
+ return "SignalReactor[" + getSignalID() + ", " + getProtocol() + ", correlation="
+ getCorrelationID() + "]";
}
-
- protected abstract void indicating(InputStream stream) throws Exception;
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java
new file mode 100644
index 0000000000..479b58a64a
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java
@@ -0,0 +1,127 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.util.concurrent;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Eike Stepper
+ */
+public class AsynchronousWorkSerializer implements WorkSerializer, Runnable
+{
+ private ExecutorService executorService;
+
+ private Queue<Runnable> workQueue;
+
+ private Occupation occupation = new Occupation();
+
+ public AsynchronousWorkSerializer(ExecutorService executorService, Queue<Runnable> workQueue)
+ {
+ if (executorService == null)
+ {
+ throw new IllegalArgumentException("executorService == null");
+ }
+
+ this.executorService = executorService;
+ this.workQueue = workQueue;
+ }
+
+ public AsynchronousWorkSerializer(ExecutorService executorService)
+ {
+ this(executorService, new ConcurrentLinkedQueue());
+ }
+
+ public ExecutorService getExecutorService()
+ {
+ return executorService;
+ }
+
+ public void addWork(Runnable work)
+ {
+ workQueue.add(work);
+
+ // isOccupied can (and must) be called unsynchronized here
+ if (!occupation.isOccupied())
+ {
+ synchronized (occupation)
+ {
+ occupation.setOccupied(true);
+ }
+
+ System.out.println(toString() + ": Notifying executor service");
+ executorService.execute(this);
+ }
+ }
+
+ /**
+ * Executed in the context of the
+ * {@link #getExecutorService() executor service}.
+ * <p>
+ */
+ public void run()
+ {
+ synchronized (occupation)
+ {
+ Runnable work;
+ while (occupation.isOccupied() && (work = workQueue.poll()) != null)
+ {
+ try
+ {
+ work.run();
+ }
+ catch (RuntimeException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ occupation.setOccupied(false);
+ }
+ }
+
+ public void dispose()
+ {
+ if (occupation.isOccupied())
+ {
+ occupation.setOccupied(false);
+ }
+
+ workQueue.clear();
+ workQueue = null;
+ executorService = null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AsynchronousWorkSerializer[" + executorService + "]";
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static class Occupation
+ {
+ private boolean occupied = false;
+
+ public boolean isOccupied()
+ {
+ return occupied;
+ }
+
+ public void setOccupied(boolean occupied)
+ {
+ this.occupied = occupied;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Asynchronizer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java
index 60840f0777..d727aaf974 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Asynchronizer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java
@@ -13,13 +13,24 @@ package org.eclipse.net4j.util.concurrent;
/**
* @author Eike Stepper
*/
-public class Asynchronizer
+public class SynchronousWorkSerializer implements WorkSerializer
{
- public Asynchronizer()
+ public SynchronousWorkSerializer()
{
}
public void addWork(Runnable work)
{
+ work.run();
+ }
+
+ public void dispose()
+ {
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SynchronousWorkSerializer";
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Worker.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java
index 0a1d268b18..4539855c14 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Worker.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java
@@ -13,13 +13,9 @@ package org.eclipse.net4j.util.concurrent;
/**
* @author Eike Stepper
*/
-public class Worker
+public interface WorkSerializer
{
- public Worker()
- {
- }
+ public void addWork(Runnable work);
- public void addWork(Runnable work)
- {
- }
+ public void dispose();
}

Back to the top