summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2007-09-17 12:13:09 (EDT)
committerEike Stepper2007-09-17 12:13:09 (EDT)
commit228b4ccbfdb7b87b83f151ec6ce0035488fc9372 (patch)
treeaa5372fd3626e49b500706a9439bb8b7bd360dfb
parentb1ce98594fd9e3da1efd702646322374c9f85f43 (diff)
downloadcdo-228b4ccbfdb7b87b83f151ec6ce0035488fc9372.zip
cdo-228b4ccbfdb7b87b83f151ec6ce0035488fc9372.tar.gz
cdo-228b4ccbfdb7b87b83f151ec6ce0035488fc9372.tar.bz2
[201366] Random deadlocks
https://bugs.eclipse.org/bugs/show_bug.cgi?id=201366
-rw-r--r--plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java7
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java44
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java14
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java39
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/IChannel.java5
6 files changed, 73 insertions, 47 deletions
diff --git a/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java b/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java
index 676d2fb..b3d744b 100644
--- a/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java
+++ b/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMConnector.java
@@ -77,14 +77,15 @@ public abstract class JVMConnector extends Connector
}
@Override
- protected void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException
+ protected void registerChannelWithPeer(int channelID, short channelIndex, IProtocol protocol)
+ throws ConnectorException
{
try
{
- Channel channel = getPeer().createChannel(channelIndex, protocol);
+ Channel channel = getPeer().createChannel(channelID, channelIndex, protocol);
if (channel == null)
{
- throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$
+ throw new ConnectorException("Failed to register channel with peer");
}
channel.activate();
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
index 76b879d..e4252cd 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
*/
public final class ControlChannel extends Channel
{
- public static final short CONTROL_CHANNEL_ID = -1;
+ public static final short CONTROL_CHANNEL_INDEX = -1;
public static final long REGISTRATION_TIMEOUT = 500000;
@@ -45,10 +45,10 @@ public final class ControlChannel extends Channel
private SynchronizingCorrelator<Short, Boolean> registrations = new SynchronizingCorrelator<Short, Boolean>();
- public ControlChannel(TCPConnector connector)
+ public ControlChannel(int channelID, TCPConnector connector)
{
- super(connector.getReceiveExecutor());
- setChannelIndex(CONTROL_CHANNEL_ID);
+ super(channelID, connector.getReceiveExecutor());
+ setChannelIndex(CONTROL_CHANNEL_INDEX);
setConnector(connector);
}
@@ -58,7 +58,13 @@ public final class ControlChannel extends Channel
return true;
}
- public boolean registerChannel(short channelIndex, IProtocol protocol)
+ @Override
+ public TCPConnector getConnector()
+ {
+ return (TCPConnector)super.getConnector();
+ }
+
+ public boolean registerChannel(int channelID, short channelIndex, IProtocol protocol)
{
if (TRACER.isEnabled())
{
@@ -69,8 +75,9 @@ public final class ControlChannel extends Channel
ISynchronizer<Boolean> registration = registrations.correlate(channelIndex);
IBuffer buffer = provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID);
+ ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_INDEX);
byteBuffer.put(OPCODE_REGISTRATION);
+ byteBuffer.putInt(channelID);
byteBuffer.putShort(channelIndex);
BufferUtil.putUTF8(byteBuffer, protocol == null ? null : protocol.getType());
handleBuffer(buffer);
@@ -78,7 +85,7 @@ public final class ControlChannel extends Channel
return registration.get(REGISTRATION_TIMEOUT);
}
- public void deregisterChannel(short channelIndex)
+ public void deregisterChannel(int channelID, short channelIndex)
{
if (TRACER.isEnabled())
{
@@ -88,8 +95,9 @@ public final class ControlChannel extends Channel
assertValidChannelIndex(channelIndex);
IBuffer buffer = provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID);
+ ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_INDEX);
byteBuffer.put(OPCODE_DEREGISTRATION);
+ byteBuffer.putInt(channelID);
byteBuffer.putShort(channelIndex);
handleBuffer(buffer);
}
@@ -105,6 +113,7 @@ public final class ControlChannel extends Channel
{
case OPCODE_REGISTRATION:
{
+ int channelID = byteBuffer.getInt();
short channelIndex = byteBuffer.getShort();
assertValidChannelIndex(channelIndex);
boolean success = true;
@@ -113,7 +122,7 @@ public final class ControlChannel extends Channel
{
byte[] handlerFactoryUTF8 = BufferUtil.getByteArray(byteBuffer);
String protocolID = BufferUtil.fromUTF8(handlerFactoryUTF8);
- Channel channel = ((TCPConnector)getConnector()).createChannel(channelIndex, protocolID);
+ Channel channel = getConnector().createChannel(channelID, channelIndex, protocolID);
if (channel != null)
{
channel.activate();
@@ -143,18 +152,13 @@ public final class ControlChannel extends Channel
case OPCODE_DEREGISTRATION:
{
+ int channelID = byteBuffer.getInt();
short channelIndex = byteBuffer.getShort();
- if (channelIndex != CONTROL_CHANNEL_ID)
+ if (channelIndex != CONTROL_CHANNEL_INDEX)
{
try
{
- getConnector().inverseRemoveChannel(channelIndex);
- // Channel channel =
- // ((TCPConnector)getConnector()).getChannel(channelIndex);
- // if (channel != null)
- // {
- // channel.deactivate();
- // }
+ getConnector().inverseRemoveChannel(channelID, channelIndex);
}
catch (Exception ex)
{
@@ -167,7 +171,7 @@ public final class ControlChannel extends Channel
default:
OM.LOG.error("Invalid opcode: " + opcode); //$NON-NLS-1$
- ((TCPConnector)getConnector()).deactivate();
+ getConnector().deactivate();
}
}
finally
@@ -185,7 +189,7 @@ public final class ControlChannel extends Channel
private void sendStatus(byte opcode, short channelIndex, boolean status)
{
IBuffer buffer = provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID);
+ ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_INDEX);
byteBuffer.put(opcode);
byteBuffer.putShort(channelIndex);
byteBuffer.put(status ? SUCCESS : FAILURE);
@@ -194,7 +198,7 @@ public final class ControlChannel extends Channel
private void assertValidChannelIndex(short channelIndex)
{
- if (channelIndex <= CONTROL_CHANNEL_ID)
+ if (channelIndex <= CONTROL_CHANNEL_INDEX)
{
throw new IllegalArgumentException("channelIndex <= CONTROL_CHANNEL_ID"); //$NON-NLS-1$
}
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
index da8e40d..20ad03e 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
@@ -166,7 +166,8 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
if (byteBuffer != null)
{
short channelIndex = inputBuffer.getChannelIndex();
- Channel channel = channelIndex == ControlChannel.CONTROL_CHANNEL_ID ? controlChannel : getChannel(channelIndex);
+ Channel channel = channelIndex == ControlChannel.CONTROL_CHANNEL_INDEX ? controlChannel
+ : getChannel(channelIndex);
if (channel != null)
{
channel.handleBufferFromMultiplexer(inputBuffer);
@@ -258,11 +259,12 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
}
@Override
- protected void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException
+ protected void registerChannelWithPeer(int channelID, short channelIndex, IProtocol protocol)
+ throws ConnectorException
{
try
{
- if (!controlChannel.registerChannel(channelIndex, protocol))
+ if (!controlChannel.registerChannel(channelID, channelIndex, protocol))
{
throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$
}
@@ -278,7 +280,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
}
@Override
- public void inverseRemoveChannel(short channelIndex)
+ public void inverseRemoveChannel(int channelID, short channelIndex)
{
try
{
@@ -311,7 +313,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
{
if (controlChannel != null && isConnected())
{
- controlChannel.deregisterChannel(channel.getChannelIndex());
+ controlChannel.deregisterChannel(channel.getChannelID(), channel.getChannelIndex());
}
return true;
@@ -339,7 +341,7 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
protected void doActivate() throws Exception
{
super.doActivate();
- controlChannel = new ControlChannel(this);
+ controlChannel = new ControlChannel(getNextChannelID(), this);
controlChannel.activate();
selector.registerAsync(socketChannel, this);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java
index 1d8262b..87a5d67 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java
@@ -40,6 +40,8 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
+ private int channelID;
+
private short channelIndex = Buffer.NO_CHANNEL;
private Connector connector;
@@ -56,14 +58,15 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
private Queue<IBuffer> sendQueue;
- public Channel(ExecutorService receiveExecutor)
+ public Channel(int channelID, ExecutorService receiveExecutor)
{
+ this.channelID = channelID;
this.receiveExecutor = receiveExecutor;
}
- public IChannelID getID()
+ public int getChannelID()
{
- return new ChannelIDImpl();
+ return channelID;
}
public short getChannelIndex()
@@ -254,6 +257,8 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
/**
* @author Eike Stepper
*/
+ @SuppressWarnings("unused")
+ @Deprecated
private final class ChannelIDImpl implements IChannelID, Cloneable, Serializable
{
private static final long serialVersionUID = 1L;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
index 3473373..3be9ff4 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
@@ -74,6 +74,8 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
*/
private ExecutorService receiveExecutor;
+ private int nextChannelID;
+
private List<Channel> channels = new ArrayList<Channel>(0);
private RWLock channelsLock = new RWLock(2500);
@@ -350,9 +352,10 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
public IChannel openChannel(IProtocol protocol) throws ConnectorException
{
waitForConnection(Long.MAX_VALUE);
+ int channelID = getNextChannelID();
short channelIndex = findFreeChannelIndex();
- Channel channel = createChannel(channelIndex, protocol);
- registerChannelWithPeer(channelIndex, protocol);
+ Channel channel = createChannel(channelID, channelIndex, protocol);
+ registerChannelWithPeer(channelID, channelIndex, protocol);
try
{
@@ -370,15 +373,15 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
return channel;
}
- public Channel createChannel(short channelIndex, String protocolID)
+ public Channel createChannel(int channelID, short channelIndex, String protocolID)
{
IProtocol protocol = createProtocol(protocolID, null);
- return createChannel(channelIndex, protocol);
+ return createChannel(channelID, channelIndex, protocol);
}
- public Channel createChannel(short channelIndex, IProtocol protocol)
+ public Channel createChannel(int channelID, short channelIndex, IProtocol protocol)
{
- Channel channel = new Channel(receiveExecutor);
+ Channel channel = new Channel(channelID, receiveExecutor);
if (protocol != null)
{
protocol.setChannel(channel);
@@ -416,6 +419,11 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
});
}
+ protected int getNextChannelID()
+ {
+ return nextChannelID++;
+ }
+
protected List<Queue<IBuffer>> getChannelBufferQueues()
{
final List<Queue<IBuffer>> result = new ArrayList<Queue<IBuffer>>(channels.size());
@@ -437,7 +445,7 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
return result;
}
- private short findFreeChannelIndex()
+ protected short findFreeChannelIndex()
{
return channelsLock.read(new Callable<Short>()
{
@@ -540,14 +548,24 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
return removed;
}
- public void inverseRemoveChannel(short channelIndex)
+ public void inverseRemoveChannel(int channelID, short channelIndex)
{
try
{
Channel channel = getChannel(channelIndex);
if (channel != null)
{
- removeChannel(channel);
+ if (channel.getChannelID() != channelID)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Ignoring concurrent atempt to remove channel {0} (channelID={1}", channelIndex, channelID);
+ }
+ }
+ else
+ {
+ removeChannel(channel);
+ }
}
}
catch (RuntimeException ex)
@@ -669,7 +687,8 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
super.doDeactivate();
}
- protected abstract void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException;
+ protected abstract void registerChannelWithPeer(int channelID, short channelIndex, IProtocol protocol)
+ throws ConnectorException;
/**
* @author Eike Stepper
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IChannel.java
index 28240d6..521e35b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IChannel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IChannel.java
@@ -84,11 +84,6 @@ import org.eclipse.net4j.util.event.INotifier;
public interface IChannel extends IBufferHandler, INotifier
{
/**
- * Returns an identifier of this channel that is unique among all channels of all {@link IConnector}s.
- */
- public IChannelID getID();
-
- /**
* Returns the index of this channel within the array of channels returned from the
* {@link IConnector#getChannels() getChannels()} method of the connector of this channel.
*/