summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2007-12-30 04:36:20 (EST)
committerEike Stepper2007-12-30 04:36:20 (EST)
commit1177b5c416bc00240a67275cba7c0b1a73391905 (patch)
treea6f60d0efc84ec95670367291c0c95c99b7df3ed
parented5400be2be6c6462120549c0a09cfb0d0e42fda (diff)
downloadcdo-1177b5c416bc00240a67275cba7c0b1a73391905.zip
cdo-1177b5c416bc00240a67275cba7c0b1a73391905.tar.gz
cdo-1177b5c416bc00240a67275cba7c0b1a73391905.tar.bz2
[213782] Transaction DeadLock
https://bugs.eclipse.org/bugs/show_bug.cgi?id=213782
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/AllTests.java2
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TDD.java2
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionDeadLockTest.java (renamed from plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/RandomDeadLockTest.java)4
-rw-r--r--plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java8
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java37
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java10
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java74
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/InternalChannel.java34
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java46
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/protocol/Protocol.java48
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/Net4jUtil.java15
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/IConnector.java3
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java13
15 files changed, 168 insertions, 143 deletions
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/AllTests.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/AllTests.java
index 2ca3826..a99c646 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/AllTests.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/AllTests.java
@@ -31,7 +31,7 @@ public class AllTests
suite.addTestSuite(CrossReferenceTest.class);
suite.addTestSuite(ChunkingTest.class);
suite.addTestSuite(ChunkingWithMEMTest.class);
- suite.addTestSuite(RandomDeadLockTest.class);
+ suite.addTestSuite(TransactionDeadLockTest.class);
suite.addTestSuite(PackageRegistryTest.class);
suite.addTestSuite(RevisionDeltaTest.class);
// TODO suite.addTestSuite(GeneratedEcoreTest.class);
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TDD.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TDD.java
index 4b7ad5c..5138c9a 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TDD.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TDD.java
@@ -21,7 +21,7 @@ import java.lang.reflect.Method;
*/
public class TDD
{
- private static final AbstractOMTest test = new RandomDeadLockTest();
+ private static final AbstractOMTest test = new TransactionDeadLockTest();
private static final String METHOD_NAME = "_testCreateManyTransaction";
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/RandomDeadLockTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionDeadLockTest.java
index 9640d34..6bd684b 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/RandomDeadLockTest.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionDeadLockTest.java
@@ -22,11 +22,11 @@ import org.eclipse.net4j.util.om.OMPlatform;
import org.eclipse.emf.ecore.resource.impl.ResourceSetImpl;
/**
- * @see https://bugs.eclipse.org/bugs/show_bug.cgi?id=201366
* @see https://bugs.eclipse.org/bugs/show_bug.cgi?id=213782
+ * @see https://bugs.eclipse.org/bugs/show_bug.cgi?id=201366
* @author Simon McDuff
*/
-public class RandomDeadLockTest extends AbstractCDOTest
+public class TransactionDeadLockTest extends AbstractCDOTest
{
@Override
protected void doSetUp() throws Exception
diff --git a/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java b/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java
index 1cc0baf..03a9637 100644
--- a/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java
+++ b/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java
@@ -16,7 +16,7 @@ import org.eclipse.net4j.connector.ConnectorException;
import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.util.security.INegotiationContext;
-import org.eclipse.internal.net4j.channel.Channel;
+import org.eclipse.internal.net4j.channel.InternalChannel;
import org.eclipse.internal.net4j.connector.Connector;
import java.util.Queue;
@@ -64,13 +64,13 @@ public abstract class JVMConnector extends Connector
public void multiplexChannel(IChannel localChannel)
{
short channelIndex = localChannel.getChannelIndex();
- Channel peerChannel = peer.getChannel(channelIndex);
+ InternalChannel peerChannel = peer.getChannel(channelIndex);
if (peerChannel == null)
{
throw new IllegalStateException("peerChannel == null"); //$NON-NLS-1$
}
- Queue<IBuffer> localQueue = ((Channel)localChannel).getSendQueue();
+ Queue<IBuffer> localQueue = ((InternalChannel)localChannel).getSendQueue();
IBuffer buffer = localQueue.poll();
buffer.flip();
peerChannel.handleBufferFromMultiplexer(buffer);
@@ -88,7 +88,7 @@ public abstract class JVMConnector extends Connector
{
try
{
- Channel channel = getPeer().createChannel(channelID, channelIndex, protocol);
+ InternalChannel channel = getPeer().createChannel(channelID, channelIndex, protocol);
if (channel == null)
{
throw new ConnectorException("Failed to register channel with peer");
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
index 1f752d1..4dff2d4 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
@@ -23,6 +23,7 @@ import org.eclipse.net4j.util.security.INegotiationContext.Receiver;
import org.eclipse.internal.net4j.buffer.BufferUtil;
import org.eclipse.internal.net4j.channel.Channel;
+import org.eclipse.internal.net4j.channel.InternalChannel;
import java.nio.ByteBuffer;
@@ -51,24 +52,17 @@ public final class ControlChannel extends Channel
private SynchronizingCorrelator<Short, Boolean> registrations = new SynchronizingCorrelator<Short, Boolean>();
- private TCPConnector connector;
-
public ControlChannel(int channelID, TCPConnector connector)
{
- super(channelID, connector.getBufferProvider(), connector, connector.getReceiveExecutor());
- this.connector = connector;
+ setChannelID(channelID);
setChannelIndex(CONTROL_CHANNEL_INDEX);
- }
-
- @Override
- public boolean isInternal()
- {
- return true;
+ setChannelMultiplexer(connector);
+ setReceiveExecutor(connector.getReceiveExecutor());
}
public TCPConnector getConnector()
{
- return connector;
+ return (TCPConnector)getChannelMultiplexer();
}
public boolean registerChannel(int channelID, short channelIndex, IProtocol protocol)
@@ -127,11 +121,11 @@ public final class ControlChannel extends Channel
case OPCODE_NEGOTIATION:
{
assertNegotiating();
- INegotiationContext negotiationContext = connector.getNegotiationContext();
+ INegotiationContext negotiationContext = getConnector().getNegotiationContext();
while (negotiationContext == null)
{
ConcurrencyUtil.sleep(20);
- negotiationContext = connector.getNegotiationContext();
+ negotiationContext = getConnector().getNegotiationContext();
}
Receiver receiver = negotiationContext.getReceiver();
@@ -151,7 +145,7 @@ public final class ControlChannel extends Channel
{
byte[] handlerFactoryUTF8 = BufferUtil.getByteArray(byteBuffer);
String protocolID = BufferUtil.fromUTF8(handlerFactoryUTF8);
- Channel channel = connector.createChannel(channelID, channelIndex, protocolID);
+ InternalChannel channel = getConnector().createChannel(channelID, channelIndex, protocolID);
if (channel != null)
{
channel.activate();
@@ -189,7 +183,7 @@ public final class ControlChannel extends Channel
{
try
{
- connector.inverseRemoveChannel(channelID, channelIndex);
+ getConnector().inverseRemoveChannel(channelID, channelIndex);
}
catch (Exception ex)
{
@@ -202,7 +196,7 @@ public final class ControlChannel extends Channel
default:
OM.LOG.error("Invalid opcode: " + opcode); //$NON-NLS-1$
- connector.deactivate();
+ getConnector().deactivate();
}
}
finally
@@ -227,18 +221,23 @@ public final class ControlChannel extends Channel
handleBuffer(buffer);
}
+ private IBuffer provideBuffer()
+ {
+ return getConnector().getBufferProvider().provideBuffer();
+ }
+
private void assertNegotiating()
{
- if (!connector.isNegotiating())
+ if (!getConnector().isNegotiating())
{
- connector.deactivate();
+ getConnector().deactivate();
throw new IllegalStateException("Connector is not negotiating");
}
}
private void assertConnected()
{
- if (!connector.isConnected())
+ if (!getConnector().isConnected())
{
throw new IllegalStateException("Connector is not connected");
}
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
index 92938ec..3a028fa 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
@@ -25,7 +25,7 @@ import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.security.INegotiationContext;
-import org.eclipse.internal.net4j.channel.Channel;
+import org.eclipse.internal.net4j.channel.InternalChannel;
import org.eclipse.internal.net4j.connector.Connector;
import java.nio.ByteBuffer;
@@ -109,8 +109,8 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
}
/**
- * Called by {@link Channel} each time a new buffer is available for multiplexing. This or another buffer can be
- * dequeued from the outputQueue of the {@link Channel}.
+ * Called by an {@link IChannel} each time a new buffer is available for multiplexing. This or another buffer can be
+ * dequeued from the outputQueue of the {@link IChannel}.
*/
public void multiplexChannel(IChannel channel)
{
@@ -162,7 +162,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
if (byteBuffer != null)
{
short channelIndex = inputBuffer.getChannelIndex();
- Channel channel = channelIndex == ControlChannel.CONTROL_CHANNEL_INDEX ? controlChannel
+ InternalChannel channel = channelIndex == ControlChannel.CONTROL_CHANNEL_INDEX ? controlChannel
: getChannel(channelIndex);
if (channel != null)
{
@@ -280,7 +280,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
{
try
{
- Channel channel = getChannel(channelIndex);
+ InternalChannel channel = getChannel(channelIndex);
if (channel instanceof ControlChannel)
{
return;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java
index d549835..b3c9df0 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java
@@ -13,8 +13,6 @@ package org.eclipse.internal.net4j.channel;
import org.eclipse.net4j.buffer.BufferState;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferHandler;
-import org.eclipse.net4j.buffer.IBufferProvider;
-import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.internal.util.concurrent.QueueWorkerWorkSerializer;
import org.eclipse.net4j.internal.util.concurrent.SynchronousWorkSerializer;
@@ -34,38 +32,29 @@ import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public class Channel extends Lifecycle implements IChannel, IBufferProvider
+public class Channel extends Lifecycle implements InternalChannel
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
private int channelID;
- private short channelIndex = Buffer.NO_CHANNEL;
-
- // private Connector connector;
+ private IChannelMultiplexer channelMultiplexer;
- private IBufferProvider bufferProvider;
+ private short channelIndex = Buffer.NO_CHANNEL;
- private IChannelMultiplexer channelMultiplexer;
+ private ExecutorService receiveExecutor;
/**
* The external handler for buffers passed from the {@link #connector}.
*/
private IBufferHandler receiveHandler;
- private ExecutorService receiveExecutor;
-
private IWorkSerializer receiveSerializer;
private Queue<IBuffer> sendQueue;
- public Channel(int channelID, IBufferProvider bufferProvider, IChannelMultiplexer channelMultiplexer,
- ExecutorService receiveExecutor)
+ public Channel()
{
- this.channelID = channelID;
- this.bufferProvider = bufferProvider;
- this.channelMultiplexer = channelMultiplexer;
- this.receiveExecutor = receiveExecutor;
}
public int getChannelID()
@@ -73,6 +62,21 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
return channelID;
}
+ public void setChannelID(int channelID)
+ {
+ this.channelID = channelID;
+ }
+
+ public IChannelMultiplexer getChannelMultiplexer()
+ {
+ return channelMultiplexer;
+ }
+
+ public void setChannelMultiplexer(IChannelMultiplexer channelMultiplexer)
+ {
+ this.channelMultiplexer = channelMultiplexer;
+ }
+
public short getChannelIndex()
{
return channelIndex;
@@ -88,34 +92,14 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
this.channelIndex = channelIndex;
}
- // public Connector getConnector()
- // {
- // return connector;
- // }
- //
- // public void setConnector(Connector connector)
- // {
- // this.connector = connector;
- // }
-
- public short getBufferCapacity()
- {
- return bufferProvider.getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- return bufferProvider.provideBuffer();
- }
-
- public void retainBuffer(IBuffer buffer)
+ public ExecutorService getReceiveExecutor()
{
- bufferProvider.retainBuffer(buffer);
+ return receiveExecutor;
}
- public Queue<IBuffer> getSendQueue()
+ public void setReceiveExecutor(ExecutorService receiveExecutor)
{
- return sendQueue;
+ this.receiveExecutor = receiveExecutor;
}
public IBufferHandler getReceiveHandler()
@@ -128,14 +112,9 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
this.receiveHandler = receiveHandler;
}
- public ExecutorService getReceiveExecutor()
- {
- return receiveExecutor;
- }
-
- public boolean isInternal()
+ public Queue<IBuffer> getSendQueue()
{
- return false;
+ return sendQueue;
}
public void close()
@@ -207,7 +186,6 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
{
super.doBeforeActivate();
checkState(channelIndex != Buffer.NO_CHANNEL, "channelIndex == NO_CHANNEL"); //$NON-NLS-1$
- checkState(bufferProvider, "bufferProvider"); //$NON-NLS-1$
checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/InternalChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/InternalChannel.java
new file mode 100644
index 0000000..192ecaf
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/InternalChannel.java
@@ -0,0 +1,34 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.channel;
+
+import org.eclipse.net4j.buffer.IBuffer;
+import org.eclipse.net4j.channel.IChannel;
+import org.eclipse.net4j.util.lifecycle.ILifecycle;
+
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Eike Stepper
+ */
+public interface InternalChannel extends IChannel, ILifecycle.Introspection
+{
+ public void setChannelID(int channelID);
+
+ public void setChannelIndex(short channelIndex);
+
+ public ExecutorService getReceiveExecutor();
+
+ public Queue<IBuffer> getSendQueue();
+
+ public void handleBufferFromMultiplexer(IBuffer buffer);
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
index 7b149f2..7b8a7a9 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
@@ -44,6 +44,7 @@ import org.eclipse.net4j.util.security.INegotiator;
import org.eclipse.internal.net4j.bundle.OM;
import org.eclipse.internal.net4j.channel.Channel;
+import org.eclipse.internal.net4j.channel.InternalChannel;
import org.eclipse.internal.net4j.protocol.ClientProtocolFactory;
import org.eclipse.internal.net4j.protocol.ServerProtocolFactory;
@@ -77,15 +78,15 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
private IBufferProvider bufferProvider;
/**
- * An optional executor to be used by the {@link IChannel}s to process their {@link Channel#receiveQueue} instead of
- * the current thread. If not <code>null</code> the sender and the receiver peers become decoupled.
+ * An optional executor to be used by the {@link IChannel}s to process their receive queues instead of the current
+ * thread. If not <code>null</code> the sender and the receiver peers become decoupled.
* <p>
*/
private ExecutorService receiveExecutor;
private int nextChannelID;
- private List<Channel> channels = new ArrayList<Channel>(0);
+ private List<InternalChannel> channels = new ArrayList<InternalChannel>(0);
private RWLock channelsLock = new RWLock(2500);
@@ -349,7 +350,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
{
public void run()
{
- for (Channel channel : channels)
+ for (InternalChannel channel : channels)
{
if (channel != null)
{
@@ -398,7 +399,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
int channelID = getNextChannelID();
short channelIndex = findFreeChannelIndex();
- Channel channel = createChannel(channelID, channelIndex, protocol);
+ InternalChannel channel = createChannel(channelID, channelIndex, protocol);
registerChannelWithPeer(channelID, channelIndex, protocol);
try
@@ -417,15 +418,17 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
return channel;
}
- public Channel createChannel(int channelID, short channelIndex, String protocolID)
+ public InternalChannel createChannel(int channelID, short channelIndex, String protocolID)
{
IProtocol protocol = createProtocol(protocolID, null);
return createChannel(channelID, channelIndex, protocol);
}
- public Channel createChannel(int channelID, short channelIndex, IProtocol protocol)
+ public InternalChannel createChannel(int channelID, short channelIndex, IProtocol protocol)
{
- Channel channel = new Channel(channelID, bufferProvider, this, receiveExecutor);
+ InternalChannel channel = createChannel();
+ channel.setChannelID(channelID);
+
if (protocol != null)
{
protocol.setChannel(channel);
@@ -445,18 +448,25 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
}
channel.setChannelIndex(channelIndex);
- // channel.setConnector(this);
channel.setReceiveHandler(protocol);
channel.addListener(channelListener); // TODO remove?
addChannel(channel);
return channel;
}
- public Channel getChannel(final short channelIndex)
+ protected InternalChannel createChannel()
+ {
+ Channel channel = new Channel();
+ channel.setChannelMultiplexer(this);
+ channel.setReceiveExecutor(receiveExecutor);
+ return channel;
+ }
+
+ public InternalChannel getChannel(final short channelIndex)
{
- return channelsLock.read(new Callable<Channel>()
+ return channelsLock.read(new Callable<InternalChannel>()
{
- public Channel call() throws Exception
+ public InternalChannel call() throws Exception
{
return channels.get(channelIndex);
}
@@ -475,7 +485,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
{
public void run()
{
- for (final Channel channel : channels)
+ for (final InternalChannel channel : channels)
{
if (channel != null && channel.isActive())
{
@@ -509,7 +519,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
});
}
- protected void addChannel(final Channel channel)
+ protected void addChannel(final InternalChannel channel)
{
channelsLock.write(new Runnable()
{
@@ -576,7 +586,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
{
if (channelIndex < channels.size())
{
- Channel c = channels.get(channelIndex);
+ InternalChannel c = channels.get(channelIndex);
if (c != null && c.isActive())
{
throw ex;
@@ -596,7 +606,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
{
try
{
- Channel channel = getChannel(channelIndex);
+ InternalChannel channel = getChannel(channelIndex);
if (channel != null)
{
if (channel.getChannelID() != channelID)
@@ -649,6 +659,8 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
// Create protocol
String description = null;
IProtocol protocol = (IProtocol)factory.create(description);
+ protocol.setBufferProvider(bufferProvider);
+ protocol.setExecutorService(receiveExecutor);
if (infraStructure != null)
{
protocol.setInfraStructure(infraStructure);
@@ -725,7 +737,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
{
for (short i = 0; i < channels.size(); i++)
{
- Channel channel = channels.get(i);
+ InternalChannel channel = channels.get(i);
if (channel != null)
{
LifecycleUtil.deactivate(channel);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/protocol/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/protocol/Protocol.java
index 13f4990..f61e838 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/protocol/Protocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/protocol/Protocol.java
@@ -10,21 +10,23 @@
**************************************************************************/
package org.eclipse.internal.net4j.protocol;
-import org.eclipse.net4j.Net4jUtil;
-import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.internal.util.lifecycle.Lifecycle;
import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.internal.net4j.channel.Channel;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public abstract class Protocol extends Lifecycle implements IProtocol, IBufferProvider
+public abstract class Protocol extends Lifecycle implements IProtocol
{
- private Channel channel;
+ private IChannel channel;
+
+ private IBufferProvider bufferProvider;
+
+ private ExecutorService executorService;
private Object infraStructure;
@@ -32,49 +34,53 @@ public abstract class Protocol extends Lifecycle implements IProtocol, IBufferPr
{
}
- public Channel getChannel()
+ public IChannel getChannel()
{
return channel;
}
public void setChannel(IChannel channel)
{
- this.channel = (Channel)channel;
+ this.channel = channel;
}
- public Object getInfraStructure()
+ public IBufferProvider getBufferProvider()
{
- return infraStructure;
+ return bufferProvider;
}
- public void setInfraStructure(Object infraStructure)
+ public void setBufferProvider(IBufferProvider bufferProvider)
{
- this.infraStructure = infraStructure;
+ this.bufferProvider = bufferProvider;
}
- public short getBufferCapacity()
+ public ExecutorService getExecutorService()
{
- return Net4jUtil.getBufferProvider(channel).getBufferCapacity();
+ return executorService;
}
- public IBuffer provideBuffer()
+ public void setExecutorService(ExecutorService executorService)
{
- return Net4jUtil.getBufferProvider(channel).provideBuffer();
+ this.executorService = executorService;
}
- public void retainBuffer(IBuffer buffer)
+ public Object getInfraStructure()
{
- Net4jUtil.getBufferProvider(channel).retainBuffer(buffer);
+ return infraStructure;
+ }
+
+ public void setInfraStructure(Object infraStructure)
+ {
+ this.infraStructure = infraStructure;
}
@Override
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
- if (channel == null)
- {
- throw new IllegalStateException("channel == null");
- }
+ checkState(channel, "channel");
+ checkState(bufferProvider, "bufferProvider");
+ checkState(executorService, "executorService");
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/Net4jUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/Net4jUtil.java
index 948d51e..226f180 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/Net4jUtil.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/Net4jUtil.java
@@ -88,21 +88,6 @@ public final class Net4jUtil
return (IConnector)container.getElement(ConnectorFactory.PRODUCT_GROUP, factoryType, connectorDescription);
}
- public static IBufferProvider getBufferProvider(Object object)
- {
- if (object instanceof IBufferProvider)
- {
- return (IBufferProvider)object;
- }
-
- if (object == null)
- {
- throw new IllegalArgumentException("object == null"); //$NON-NLS-1$
- }
-
- throw new IllegalArgumentException("Unable to provide buffers: " + object); //$NON-NLS-1$
- }
-
public static IBufferProvider createBufferFactory(short bufferCapacity)
{
return new BufferFactory(bufferCapacity);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java
index 0c06d40..59b5df7 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java
@@ -20,7 +20,9 @@ import org.eclipse.net4j.util.event.INotifier;
* and virtual in the sense that it does not necessarily represent a single physical connection like a TCP socket
* connection. The underlying physical connection is represented by a {@link IConnector}.
* <p>
- * This interface is <b>not</b> intended to be implemented by clients.
+ * This interface is <b>not</b> intended to be implemented by clients. Providers of channels (for example for new
+ * physical connection types) have to subclass
+ * {@link org.eclipse.internal.net4j.channel.InternalChannel InternalChannel}.
* <p>
* <dt><b>Class Diagram:</b></dt>
* <dd><img src="doc-files/Channels.png" title="Diagram Buffers" border="0" usemap="#Channels.png"/></dd>
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/IConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/IConnector.java
index c85d71c..13cd67c 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/IConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/IConnector.java
@@ -32,8 +32,7 @@ import java.util.List;
* exchange {@link IBuffer}s.
* <p>
* This interface is <b>not</b> intended to be implemented by clients. Providers of connectors for new physical
- * connection types have to subclass {@link org.eclipse.internal.net4j.connector.Connector Connector} (see
- * {@link org.eclipse.internal.net4j.channel.Channel#setConnector(org.eclipse.internal.net4j.connector.Connector) Channel#setConnector}).
+ * connection types have to subclass {@link org.eclipse.internal.net4j.connector.Connector Connector}.
* <p>
* <dt><b>Class Diagram:</b></dt>
* <dd><img src="doc-files/Connectors.png" title="Diagram Connectors" border="0" usemap="Connectors.png"/></dd>
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java
index 843b491..4c2e810 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java
@@ -11,8 +11,11 @@
package org.eclipse.net4j.protocol;
import org.eclipse.net4j.buffer.IBufferHandler;
+import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
+import java.util.concurrent.ExecutorService;
+
/**
* @author Eike Stepper
*/
@@ -27,4 +30,12 @@ public interface IProtocol extends IBufferHandler
public Object getInfraStructure();
public void setInfraStructure(Object infraStructure);
+
+ public IBufferProvider getBufferProvider();
+
+ public void setBufferProvider(IBufferProvider bufferProvider);
+
+ public ExecutorService getExecutorService();
+
+ public void setExecutorService(ExecutorService executorService);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index 673c149..52a9b1c 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -28,7 +28,6 @@ import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
@@ -77,11 +76,6 @@ public abstract class SignalProtocol extends Protocol
}
}
- public ExecutorService getExecutorService()
- {
- return getChannel().getReceiveExecutor();
- }
-
public boolean waitForSignals(long timeout)
{
synchronized (signals)
@@ -205,6 +199,11 @@ public abstract class SignalProtocol extends Protocol
return MessageFormat.format("SignalProtocol[{0}]", getType()); //$NON-NLS-1$
}
+ public String getType()
+ {
+ return null;
+ }
+
protected final SignalReactor createSignalReactor(short signalID)
{
checkActive();
@@ -296,7 +295,7 @@ public abstract class SignalProtocol extends Protocol
{
super(getChannel(), new IBufferProvider()
{
- private IBufferProvider delegate = org.eclipse.net4j.Net4jUtil.getBufferProvider(getChannel());
+ private IBufferProvider delegate = getBufferProvider();
private boolean firstBuffer = addSignalID;