diff options
author | Eike Stepper | 2006-10-22 20:28:22 +0000 |
---|---|---|
committer | Eike Stepper | 2006-10-22 20:28:22 +0000 |
commit | 697d6277d2715b06e41abc790ab8b053df1520c7 (patch) | |
tree | 42cf443c1076c70add0e3e10a4658097f4bff574 /plugins | |
parent | d2b31d4e130e492fe6a7581612a85d35659ef4b3 (diff) | |
download | cdo-697d6277d2715b06e41abc790ab8b053df1520c7.tar.gz cdo-697d6277d2715b06e41abc790ab8b053df1520c7.tar.xz cdo-697d6277d2715b06e41abc790ab8b053df1520c7.zip |
Signal protocol
Diffstat (limited to 'plugins')
17 files changed, 765 insertions, 141 deletions
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java new file mode 100644 index 0000000000..9967a59603 --- /dev/null +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java @@ -0,0 +1,141 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.tests; + +import org.eclipse.net4j.tests.signal.Signal1Actor; +import org.eclipse.net4j.tests.signal.TestSignalProtocol; +import org.eclipse.net4j.transport.BufferProvider; +import org.eclipse.net4j.transport.Channel; +import org.eclipse.net4j.transport.ProtocolFactory; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.registry.HashMapRegistry; +import org.eclipse.net4j.util.registry.IRegistry; + +import org.eclipse.internal.net4j.transport.BufferUtil; +import org.eclipse.internal.net4j.transport.tcp.AbstractTCPConnector; +import org.eclipse.internal.net4j.transport.tcp.TCPAcceptorImpl; +import org.eclipse.internal.net4j.transport.tcp.TCPSelectorImpl; +import org.eclipse.internal.net4j.transport.tcp.TCPUtil; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import junit.framework.TestCase; + +/** + * @author Eike Stepper + */ +public class SignalTest extends TestCase +{ + private BufferProvider bufferPool; + + private TCPSelectorImpl selector; + + private TCPAcceptorImpl acceptor; + + private AbstractTCPConnector connector; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + System.out.print("================================= "); + System.out.print(getName()); + System.out.println(" ================================="); + + bufferPool = BufferUtil.createBufferPool(); + LifecycleUtil.activate(bufferPool); + assertTrue(LifecycleUtil.isActive(bufferPool)); + + selector = (TCPSelectorImpl)TCPUtil.createTCPSelector(); + selector.activate(); + assertTrue(selector.isActive()); + + acceptor = (TCPAcceptorImpl)TCPUtil.createTCPAcceptor(bufferPool, selector); + connector = (AbstractTCPConnector)TCPUtil.createTCPConnector(bufferPool, selector, "localhost"); + } + + @Override + protected void tearDown() throws Exception + { + try + { + if (connector != null) + { + connector.disconnect(); + assertFalse(connector.isActive()); + assertFalse(connector.isConnected()); + connector = null; + } + } + catch (Exception ex) + { + ex.printStackTrace(); + } + + try + { + acceptor.deactivate(); + assertFalse(acceptor.isActive()); + acceptor = null; + } + catch (Exception ex) + { + ex.printStackTrace(); + } + + try + { + selector.deactivate(); + selector = null; + } + catch (Exception ex) + { + ex.printStackTrace(); + } + + try + { + LifecycleUtil.deactivate(bufferPool); + bufferPool = null; + } + catch (Exception ex) + { + ex.printStackTrace(); + } + + System.out.println(); + System.out.println(); + Thread.sleep(10); + super.tearDown(); + } + + public void testConnect() throws Exception + { + ExecutorService threadPool = Executors.newCachedThreadPool(); + IRegistry<String, ProtocolFactory> registry = new HashMapRegistry(); + registry.register(new TestSignalProtocol.Factory()); + + acceptor.setReceiveExecutor(threadPool); + acceptor.setProtocolFactoryRegistry(registry); + acceptor.activate(); + assertTrue(acceptor.isActive()); + + connector.setReceiveExecutor(threadPool); + connector.setProtocolFactoryRegistry(registry); + assertTrue(connector.connect(1000)); + + Channel channel = connector.openChannel(TestSignalProtocol.PROTOCOL_ID); + byte[] data = HugeData.getBytes(); + byte[] result = new Signal1Actor(channel, data).start(); + assertEquals(data, result); + } +} diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/Signal1Actor.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/Signal1Actor.java new file mode 100644 index 0000000000..3d3da78466 --- /dev/null +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/Signal1Actor.java @@ -0,0 +1,46 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.tests.signal; + +import org.eclipse.net4j.signal.SignalActor; +import org.eclipse.net4j.transport.Channel; +import org.eclipse.net4j.util.stream.BufferInputStream; +import org.eclipse.net4j.util.stream.BufferOutputStream; + +/** + * @author Eike Stepper + */ +public class Signal1Actor extends SignalActor<byte[]> +{ + private byte[] data; + + public Signal1Actor(Channel channel, byte[] data) + { + super(channel); + this.data = data; + } + + @Override + protected short getSignalID() + { + return TestSignalProtocol.SIGNAL1; + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + writeByteArray(data); + out.flush(); + + byte[] result = readByteArray(); + setResult(result); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/Signal1Reactor.java index f9f75314c2..ce8586ab91 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/Signal1Reactor.java @@ -8,32 +8,29 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.signal; +package org.eclipse.net4j.tests.signal; -import java.io.OutputStream; +import org.eclipse.net4j.signal.SignalReactor; +import org.eclipse.net4j.util.stream.BufferInputStream; +import org.eclipse.net4j.util.stream.BufferOutputStream; /** * @author Eike Stepper */ -public abstract class Request extends Signal +public class Signal1Reactor extends SignalReactor { - protected Request(SignalProtocol protocol) - { - setProtocol(protocol); - setCorrelationID(protocol.getNextCorrelationID()); - } - - public Object send() throws Exception + @Override + protected short getSignalID() { - return getProtocol().sendRequest(this); + return TestSignalProtocol.SIGNAL1; } @Override - public String toString() + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception { - return "Request[" + getSignalID() + ", " + getProtocol() + ", correlation=" - + getCorrelationID() + "]"; - } + byte[] data = readByteArray(); - protected abstract void requesting(OutputStream stream) throws Exception; + writeByteArray(data); + out.flush(); + } } diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java new file mode 100644 index 0000000000..51de4ae53a --- /dev/null +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/signal/TestSignalProtocol.java @@ -0,0 +1,78 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.tests.signal; + +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.signal.SignalReactor; +import org.eclipse.net4j.transport.Channel; +import org.eclipse.net4j.transport.Protocol; +import org.eclipse.net4j.transport.ProtocolFactory; +import org.eclipse.net4j.transport.Connector.Type; + +import org.eclipse.internal.net4j.transport.AbstractProtocolFactory; + +import java.util.Set; + +/** + * @author Eike Stepper + */ +public class TestSignalProtocol extends SignalProtocol +{ + public static final String PROTOCOL_ID = "signal.protocol"; + + public static final short SIGNAL1 = 1; + + public static final short SIGNAL2 = 2; + + public static final short SIGNAL3 = 3; + + public static final short SIGNAL4 = 4; + + public TestSignalProtocol(Channel channel) + { + super(channel); + } + + public String getProtocolID() + { + return PROTOCOL_ID; + } + + @Override + protected SignalReactor createSignalReactor(short signalID) + { + switch (signalID) + { + case SIGNAL1: + return new Signal1Reactor(); + } + + throw new IllegalArgumentException("Invalid signalID " + signalID); + } + + public static class Factory extends AbstractProtocolFactory + { + public String getID() + { + return PROTOCOL_ID; + } + + public Set<Type> getConnectorTypes() + { + return ProtocolFactory.SYMMETRIC; + } + + public Protocol createProtocol(Channel channel) + { + return new TestSignalProtocol(channel); + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java index f1dc579db4..cbf6991284 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java @@ -57,7 +57,7 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con /** * TODO synchronize on channels? */ - private List<ChannelImpl> channels = new ArrayList(); + private List<ChannelImpl> channels = new ArrayList(0); private State state = State.DISCONNECTED; @@ -336,14 +336,8 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con { ChannelImpl channel = new ChannelImpl(receiveExecutor); Protocol protocol = createProtocol(protocolID, channel); - if (protocol == null) - { - System.out.println(toString() + ": Opening channel without protocol"); - } - else - { - System.out.println(toString() + ": Opening channel with protocol " + protocolID); - } + System.out.println(toString() + ": Opening channel " + channelID + + (protocol == null ? " without protocol" : " with protocol " + protocolID)); channel.setChannelID(channelID); channel.setConnector(this); @@ -379,7 +373,7 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con { for (final ChannelImpl channel : channels) { - if (channel != NULL_CHANNEL) + if (channel != NULL_CHANNEL && channel.isActive()) { Queue<Buffer> bufferQueue = channel.getSendQueue(); result.add(bufferQueue); @@ -403,7 +397,6 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con } } - channels.add(NULL_CHANNEL); return (short)size; } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java index ccd55fe90a..b3f3ae40a9 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractProtocolFactory.java @@ -18,6 +18,10 @@ import org.eclipse.net4j.transport.Connector.Type; */ public abstract class AbstractProtocolFactory implements ProtocolFactory { + public AbstractProtocolFactory() + { + } + public final boolean isForClients() { return getConnectorTypes().contains(Type.CLIENT); diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java index aa3e2ed908..1f04a58c39 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java @@ -182,16 +182,26 @@ public class BufferImpl implements Buffer public ByteBuffer startPutting(short channelID) { - if (state != State.INITIAL) + if (state == State.PUTTING) + { + if (channelID != this.channelID) + { + throw new IllegalArgumentException("channelID != this.channelID"); + } + } + else if (state != State.INITIAL) { throw new IllegalStateException("state == " + state); } + else + { + state = State.PUTTING; + this.channelID = channelID; - state = State.PUTTING; - this.channelID = channelID; + byteBuffer.clear(); + byteBuffer.position(BufferImpl.HEADER_SIZE); + } - byteBuffer.clear(); - byteBuffer.position(BufferImpl.HEADER_SIZE); return byteBuffer; } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java index f4cce359d2..6aaacc8f80 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java @@ -15,6 +15,9 @@ import org.eclipse.net4j.transport.BufferHandler; import org.eclipse.net4j.transport.BufferProvider; import org.eclipse.net4j.transport.Channel; import org.eclipse.net4j.transport.Connector; +import org.eclipse.net4j.util.concurrent.AsynchronousWorkSerializer; +import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer; +import org.eclipse.net4j.util.concurrent.WorkSerializer; import org.eclipse.net4j.util.lifecycle.AbstractLifecycle; import org.eclipse.internal.net4j.transport.BufferImpl.State; @@ -26,7 +29,7 @@ import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ -public class ChannelImpl extends AbstractLifecycle implements Channel, BufferProvider, Runnable +public class ChannelImpl extends AbstractLifecycle implements Channel, BufferProvider { private short channelID = BufferImpl.NO_CHANNEL; @@ -38,22 +41,11 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro */ private BufferHandler receiveHandler; - /** - * An optional executor that is used to process the {@link #receiveQueue} - * instead of the current thread. If not <code>null</code> the sender and - * the receiver peers become decoupled. - * <p> - */ private ExecutorService receiveExecutor; - private Occupation receiveExecutorOccupation = new Occupation(); - - /** - * TODO Optimize for embedded transport - */ - private Queue<Buffer> receiveQueue = new ConcurrentLinkedQueue(); + private WorkSerializer receiveSerializer; - private Queue<Buffer> sendQueue = new ConcurrentLinkedQueue(); + private Queue<Buffer> sendQueue; public ChannelImpl(ExecutorService receiveExecutor) { @@ -105,16 +97,6 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro return sendQueue; } - public ExecutorService getReceiveExecutor() - { - return receiveExecutor; - } - - public Queue<Buffer> getReceiveQueue() - { - return receiveQueue; - } - public BufferHandler getReceiveHandler() { return receiveHandler; @@ -125,6 +107,11 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro this.receiveHandler = receiveHandler; } + public ExecutorService getReceiveExecutor() + { + return receiveExecutor; + } + public void close() { deactivate(); @@ -148,7 +135,7 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro connector.multiplexBuffer(this); } - public void handleBufferFromMultiplexer(Buffer buffer) + public void handleBufferFromMultiplexer(final Buffer buffer) { if (receiveHandler == null) { @@ -157,46 +144,13 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro return; } - if (receiveExecutor == null) - { - // Bypass the receiveQueue - receiveHandler.handleBuffer(buffer); - return; - } - - receiveQueue.add(buffer); - - // isOccupied can (and must) be called unsynchronized here - if (receiveExecutorOccupation.isOccupied()) - { - return; - } - - synchronized (receiveExecutorOccupation) + receiveSerializer.addWork(new Runnable() { - receiveExecutorOccupation.setOccupied(true); - } - - System.out.println(toString() + ": Spawning new receive executor"); - receiveExecutor.execute(this); - } - - /** - * Executed in the context of the {@link #receiveExecutor}. - * <p> - */ - public void run() - { - synchronized (receiveExecutorOccupation) - { - Buffer buffer; - while ((buffer = receiveQueue.poll()) != null) + public void run() { receiveHandler.handleBuffer(buffer); } - - receiveExecutorOccupation.setOccupied(false); - } + }); } @Override @@ -221,27 +175,26 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro } @Override - protected void onDeactivate() throws Exception + protected void onActivate() throws Exception { - sendQueue.clear(); - super.onDeactivate(); - } - - /** - * @author Eike Stepper - */ - private static class Occupation - { - private boolean occupied = false; - - public boolean isOccupied() + super.onActivate(); + sendQueue = new ConcurrentLinkedQueue(); + if (receiveExecutor == null) { - return occupied; + receiveSerializer = new SynchronousWorkSerializer(); } - - public void setOccupied(boolean occupied) + else { - this.occupied = occupied; + receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor); } } + + @Override + protected void onDeactivate() throws Exception + { + receiveSerializer = null; + sendQueue.clear(); + sendQueue = null; + super.onDeactivate(); + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Serializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageDeserializer.java index fb6e4f7594..30b5dbc593 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Serializer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageDeserializer.java @@ -11,12 +11,12 @@ package org.eclipse.net4j.message; /** - * TODO The {@link Serializer} class. + * TODO The {@link MessageDeserializer} class. * <p> * * @author Eike Stepper */ -public interface Serializer +public interface MessageDeserializer { } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Deserializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageSerializer.java index b1a50a5d9c..95dc915f75 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/Deserializer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/message/MessageSerializer.java @@ -11,12 +11,12 @@ package org.eclipse.net4j.message; /** - * TODO The {@link Deserializer} class. + * TODO The {@link MessageSerializer} class. * <p> * * @author Eike Stepper */ -public interface Deserializer +public interface MessageSerializer { } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java index 9fffa816ed..7bc430518e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java @@ -10,29 +10,106 @@ **************************************************************************/ package org.eclipse.net4j.signal; +import org.eclipse.net4j.util.stream.BufferInputStream; +import org.eclipse.net4j.util.stream.BufferOutputStream; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + /** * @author Eike Stepper */ -public abstract class Signal +public abstract class Signal implements Runnable { private SignalProtocol protocol; private int correlationID; + private BufferInputStream inputStream; + + private BufferOutputStream outputStream; + + private DataInputStream dataInputStream; + + private DataOutputStream dataOutputStream; + protected Signal() { } - protected SignalProtocol getProtocol() + protected final SignalProtocol getProtocol() { return protocol; } - protected int getCorrelationID() + protected final int getCorrelationID() { return correlationID; } + protected final BufferInputStream getInputStream() + { + return inputStream; + } + + protected final BufferOutputStream getOutputStream() + { + return outputStream; + } + + protected DataInputStream getDataInputStream() + { + if (dataInputStream == null) + { + dataInputStream = new DataInputStream(inputStream); + } + + return dataInputStream; + } + + protected DataOutputStream getDataOutputStream() + { + if (dataOutputStream == null) + { + dataOutputStream = new DataOutputStream(outputStream); + } + + return dataOutputStream; + } + + protected void writeByteArray(byte[] bytes) throws IOException + { + getDataOutputStream().writeInt(bytes.length); + getDataOutputStream().write(bytes); + } + + protected byte[] readByteArray() throws IOException + { + int length = getDataInputStream().readInt(); + byte[] bytes = new byte[length]; + getDataInputStream().read(bytes); + return bytes; + } + + public final void run() + { + try + { + execute(inputStream, outputStream); + } + catch (Exception ex) + { + ex.printStackTrace(); + } + finally + { + getProtocol().stopSignal(this); + } + } + + protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception; + protected abstract short getSignalID(); void setProtocol(SignalProtocol protocol) @@ -44,4 +121,14 @@ public abstract class Signal { this.correlationID = correlationID; } + + void setInputStream(BufferInputStream inputStream) + { + this.inputStream = inputStream; + } + + void setOutputStream(BufferOutputStream outputStream) + { + this.outputStream = outputStream; + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java new file mode 100644 index 0000000000..6b5beb4a28 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java @@ -0,0 +1,71 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.transport.BufferHandler; +import org.eclipse.net4j.transport.Channel; + +/** + * @author Eike Stepper + */ +public abstract class SignalActor<RESULT> extends Signal +{ + private boolean terminated; + + private RESULT result; + + protected SignalActor(Channel channel) + { + SignalProtocol protocol = extractSignalProtocol(channel); + setProtocol(protocol); + setCorrelationID(protocol.getNextCorrelationID()); + } + + public RESULT start() throws Exception + { + if (terminated) + { + throw new IllegalStateException("Terminated"); + } + + getProtocol().startSignal(this); + terminated = true; + return result; + } + + @Override + public String toString() + { + return "SignalActor[" + getSignalID() + ", " + getProtocol() + ", correlation=" + + getCorrelationID() + (terminated ? ", SENT" : "") + "]"; + } + + protected void setResult(RESULT result) + { + this.result = result; + } + + private static SignalProtocol extractSignalProtocol(Channel channel) + { + BufferHandler receiveHandler = channel.getReceiveHandler(); + if (receiveHandler == null) + { + throw new IllegalArgumentException("Channel has no protocol"); + } + + if (!(receiveHandler instanceof SignalProtocol)) + { + throw new IllegalArgumentException("Channel has no signal protocol"); + } + + return (SignalProtocol)receiveHandler; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java index b65bd9cd18..2706d6c454 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java @@ -11,31 +11,71 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.transport.Buffer; +import org.eclipse.net4j.transport.BufferProvider; import org.eclipse.net4j.transport.Channel; +import org.eclipse.net4j.util.stream.BufferInputStream; +import org.eclipse.net4j.util.stream.BufferOutputStream; +import org.eclipse.net4j.util.stream.ChannelOutputStream; import org.eclipse.internal.net4j.transport.AbstractProtocol; +import org.eclipse.internal.net4j.transport.BufferUtil; +import org.eclipse.internal.net4j.transport.ChannelImpl; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ public abstract class SignalProtocol extends AbstractProtocol { - private int nextCorrelationID; + private ExecutorService executorService; - private Queue<Request> requestQueue = new ConcurrentLinkedQueue(); + private Map<Integer, Signal> signals = new ConcurrentHashMap(); - public SignalProtocol(Channel channel) + private int nextCorrelationID = 0; + + protected SignalProtocol(Channel channel, ExecutorService executorService) { super(channel); + + if (executorService == null) + { + throw new IllegalArgumentException("executorService == null"); + } + + this.executorService = executorService; + } + + protected SignalProtocol(Channel channel) + { + this(channel, ((ChannelImpl)channel).getReceiveExecutor()); } public void handleBuffer(Buffer buffer) { - // TODO Implement method SignalProtocol + ByteBuffer byteBuffer = buffer.getByteBuffer(); + int correlationID = byteBuffer.getInt(); + System.out.println("Received buffer for signal " + correlationID); + + Signal signal = signals.get(correlationID); + if (signal == null) + { + short signalID = byteBuffer.getShort(); + System.out.println("Got signal id " + signalID); + signal = createSignalReactor(signalID); + signal.setProtocol(this); + signal.setCorrelationID(correlationID); + signal.setInputStream(createInputStream()); + signal.setOutputStream(createOutputStream(correlationID, signalID)); + signals.put(correlationID, signal); + executorService.execute(signal); + } + + signal.getInputStream().handleBuffer(buffer); } @Override @@ -44,12 +84,27 @@ public abstract class SignalProtocol extends AbstractProtocol return "SignalProtocol[" + getProtocolID() + ", " + getChannel() + "]"; } - protected abstract Indication createIndication(short signalID); + protected abstract SignalReactor createSignalReactor(short signalID); + + void startSignal(SignalActor signalActor) + { + if (signalActor.getProtocol() != this) + { + throw new IllegalArgumentException("signalActor.getProtocol() != this"); + } + + short signalID = signalActor.getSignalID(); + int correlationID = signalActor.getCorrelationID(); + signalActor.setInputStream(createInputStream()); + signalActor.setOutputStream(createOutputStream(correlationID, signalID)); + signals.put(correlationID, signalActor); + signalActor.run(); + } - Object sendRequest(Request request) + void stopSignal(Signal signal) { - // TODO Implement method SignalProtocol - return null; + int correlationID = signal.getCorrelationID(); + signals.remove(correlationID); } int getNextCorrelationID() @@ -67,4 +122,61 @@ public abstract class SignalProtocol extends AbstractProtocol return correlationID; } + + BufferInputStream createInputStream() + { + return new BufferInputStream(); + } + + BufferOutputStream createOutputStream(int correlationID, short signalID) + { + return new SignalOutputStream(correlationID, signalID); + } + + class SignalInputStream extends BufferInputStream + { + public SignalInputStream() + { + } + } + + class SignalOutputStream extends ChannelOutputStream + { + public SignalOutputStream(final int correlationID, final short signalID) + { + super(getChannel(), new BufferProvider() + { + private BufferProvider delegate = BufferUtil.getBufferProvider(getChannel()); + + private boolean firstBuffer = true; + + public short getBufferCapacity() + { + return delegate.getBufferCapacity(); + } + + public Buffer provideBuffer() + { + Buffer buffer = delegate.provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(getChannel().getChannelID()); + + System.out.println("Providing buffer for signal " + correlationID); + byteBuffer.putInt(correlationID); + if (firstBuffer) + { + System.out.println("Setting signal id " + signalID); + byteBuffer.putShort(signalID); + firstBuffer = false; + } + + return buffer; + } + + public void retainBuffer(Buffer buffer) + { + delegate.retainBuffer(buffer); + } + }); + } + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java index a91fefe844..7d5e743871 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java @@ -15,18 +15,16 @@ import java.io.InputStream; /** * @author Eike Stepper */ -public abstract class Indication extends Signal +public abstract class SignalReactor extends Signal { - protected Indication() + protected SignalReactor() { } @Override public String toString() { - return "Indication[" + getSignalID() + ", " + getProtocol() + ", correlation=" + return "SignalReactor[" + getSignalID() + ", " + getProtocol() + ", correlation=" + getCorrelationID() + "]"; } - - protected abstract void indicating(InputStream stream) throws Exception; } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java new file mode 100644 index 0000000000..479b58a64a --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java @@ -0,0 +1,127 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.util.concurrent; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; + +/** + * @author Eike Stepper + */ +public class AsynchronousWorkSerializer implements WorkSerializer, Runnable +{ + private ExecutorService executorService; + + private Queue<Runnable> workQueue; + + private Occupation occupation = new Occupation(); + + public AsynchronousWorkSerializer(ExecutorService executorService, Queue<Runnable> workQueue) + { + if (executorService == null) + { + throw new IllegalArgumentException("executorService == null"); + } + + this.executorService = executorService; + this.workQueue = workQueue; + } + + public AsynchronousWorkSerializer(ExecutorService executorService) + { + this(executorService, new ConcurrentLinkedQueue()); + } + + public ExecutorService getExecutorService() + { + return executorService; + } + + public void addWork(Runnable work) + { + workQueue.add(work); + + // isOccupied can (and must) be called unsynchronized here + if (!occupation.isOccupied()) + { + synchronized (occupation) + { + occupation.setOccupied(true); + } + + System.out.println(toString() + ": Notifying executor service"); + executorService.execute(this); + } + } + + /** + * Executed in the context of the + * {@link #getExecutorService() executor service}. + * <p> + */ + public void run() + { + synchronized (occupation) + { + Runnable work; + while (occupation.isOccupied() && (work = workQueue.poll()) != null) + { + try + { + work.run(); + } + catch (RuntimeException ex) + { + ex.printStackTrace(); + } + } + + occupation.setOccupied(false); + } + } + + public void dispose() + { + if (occupation.isOccupied()) + { + occupation.setOccupied(false); + } + + workQueue.clear(); + workQueue = null; + executorService = null; + } + + @Override + public String toString() + { + return "AsynchronousWorkSerializer[" + executorService + "]"; + } + + /** + * @author Eike Stepper + */ + private static class Occupation + { + private boolean occupied = false; + + public boolean isOccupied() + { + return occupied; + } + + public void setOccupied(boolean occupied) + { + this.occupied = occupied; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Asynchronizer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java index 60840f0777..d727aaf974 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Asynchronizer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java @@ -13,13 +13,24 @@ package org.eclipse.net4j.util.concurrent; /** * @author Eike Stepper */ -public class Asynchronizer +public class SynchronousWorkSerializer implements WorkSerializer { - public Asynchronizer() + public SynchronousWorkSerializer() { } public void addWork(Runnable work) { + work.run(); + } + + public void dispose() + { + } + + @Override + public String toString() + { + return "SynchronousWorkSerializer"; } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Worker.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java index 0a1d268b18..4539855c14 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/Worker.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java @@ -13,13 +13,9 @@ package org.eclipse.net4j.util.concurrent; /** * @author Eike Stepper */ -public class Worker +public interface WorkSerializer { - public Worker() - { - } + public void addWork(Runnable work); - public void addWork(Runnable work) - { - } + public void dispose(); } |