summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2007-12-30 06:04:10 (EST)
committerEike Stepper2007-12-30 06:04:10 (EST)
commit4da7e32a517d9955ceb35e1bca0e8c208980bafb (patch)
tree3bb53e94589f897c1046e92448ced3ca011637b0
parent1177b5c416bc00240a67275cba7c0b1a73391905 (diff)
downloadcdo-4da7e32a517d9955ceb35e1bca0e8c208980bafb.zip
cdo-4da7e32a517d9955ceb35e1bca0e8c208980bafb.tar.gz
cdo-4da7e32a517d9955ceb35e1bca0e8c208980bafb.tar.bz2
[213782] Transaction DeadLock
https://bugs.eclipse.org/bugs/show_bug.cgi?id=213782
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java95
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java1
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java23
4 files changed, 52 insertions, 71 deletions
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
index 3a028fa..f42056f 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java
@@ -21,6 +21,7 @@ import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.tcp.ITCPConnector;
import org.eclipse.net4j.tcp.ITCPSelector;
import org.eclipse.net4j.tcp.ITCPSelectorListener;
+import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.security.INegotiationContext;
@@ -32,8 +33,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.List;
import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* @author Eike Stepper
@@ -48,6 +50,8 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
private SelectionKey selectionKey;
+ private BlockingQueue<InternalChannel> writeQueue = new LinkedBlockingQueue<InternalChannel>();
+
private IBuffer inputBuffer;
private ControlChannel controlChannel;
@@ -108,16 +112,6 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
return "tcp://" + host + ":" + port;
}
- /**
- * Called by an {@link IChannel} each time a new buffer is available for multiplexing. This or another buffer can be
- * dequeued from the outputQueue of the {@link IChannel}.
- */
- public void multiplexChannel(IChannel channel)
- {
- checkSelectionKey();
- selector.orderWriteInterest(selectionKey, isClient(), true);
- }
-
public void handleRegistration(SelectionKey selectionKey)
{
this.selectionKey = selectionKey;
@@ -192,43 +186,67 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
}
}
+ /**
+ * Called by an {@link IChannel} each time a new buffer is available for multiplexing. This or another buffer can be
+ * dequeued from the outputQueue of the {@link IChannel}.
+ */
+ public void multiplexChannel(IChannel channel)
+ {
+ synchronized (writeQueue)
+ {
+ boolean firstChannel = writeQueue.isEmpty();
+
+ try
+ {
+ writeQueue.put((InternalChannel)channel);
+ }
+ catch (InterruptedException ex)
+ {
+ throw WrappedException.wrap(ex);
+ }
+
+ if (firstChannel)
+ {
+ checkSelectionKey();
+ selector.orderWriteInterest(selectionKey, isClient(), true);
+ }
+ }
+ }
+
public void handleWrite(ITCPSelector selector, SocketChannel socketChannel)
{
try
{
- boolean moreToWrite = false;
- for (Queue<IBuffer> bufferQueue : getChannelBufferQueues())
+ synchronized (writeQueue)
{
- IBuffer buffer = bufferQueue.peek();
- if (buffer != null)
+ InternalChannel channel = writeQueue.peek();
+ if (channel != null)
{
- if (buffer.write(socketChannel))
+ Queue<IBuffer> bufferQueue = channel.getSendQueue();
+ if (bufferQueue != null)
{
- bufferQueue.poll();
- buffer.release();
-
- if (!moreToWrite)
+ IBuffer buffer = bufferQueue.peek();
+ if (buffer != null)
{
- moreToWrite = !bufferQueue.isEmpty();
+ if (buffer.write(socketChannel))
+ {
+ writeQueue.poll();
+ bufferQueue.poll();
+ buffer.release();
+ }
}
}
- else
- {
- moreToWrite = true;
- break;
- }
}
- }
- if (!moreToWrite)
- {
- checkSelectionKey();
- selector.orderWriteInterest(selectionKey, isClient(), false);
+ if (writeQueue.isEmpty())
+ {
+ checkSelectionKey();
+ selector.orderWriteInterest(selectionKey, isClient(), false);
+ }
}
}
catch (NullPointerException ignore)
{
- ;
}
catch (ClosedChannelException ex)
{
@@ -242,19 +260,6 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I
}
@Override
- protected List<Queue<IBuffer>> getChannelBufferQueues()
- {
- List<Queue<IBuffer>> queues = super.getChannelBufferQueues();
- Queue<IBuffer> controlQueue = controlChannel.getSendQueue();
- if (!controlQueue.isEmpty())
- {
- queues.add(controlQueue);
- }
-
- return queues;
- }
-
- @Override
protected void registerChannelWithPeer(int channelID, short channelIndex, IProtocol protocol)
throws ConnectorException
{
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java
index f712861..9e35595 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java
@@ -136,7 +136,6 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
{
return "INTEREST WRITE " + selectionKey.channel() + " = " + on;
}
-
});
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java
index a6976c9..f9fd894 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/SignalTest.java
@@ -19,6 +19,7 @@ import org.eclipse.net4j.tests.signal.TestSignalClientProtocolFactory;
import org.eclipse.net4j.tests.signal.TestSignalProtocol;
import org.eclipse.net4j.tests.signal.TestSignalServerProtocolFactory;
import org.eclipse.net4j.util.container.IManagedContainer;
+import org.eclipse.net4j.util.om.OMPlatform;
import java.util.Arrays;
@@ -57,10 +58,9 @@ public class SignalTest extends AbstractTransportTest
public void testAsync() throws Exception
{
startTransport();
- // OMPlatform.INSTANCE.setDebugging(false);
+ OMPlatform.INSTANCE.setDebugging(false);
IChannel channel = getConnector().openChannel(TestSignalProtocol.PROTOCOL_NAME, null);
String data = TinyData.getText();
- // String huge = HugeData.getText();
for (int i = 0; i < 10000; i++)
{
msg("Loop " + i);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
index 7b8a7a9..e1ef6d7 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
@@ -10,7 +10,6 @@
**************************************************************************/
package org.eclipse.internal.net4j.connector;
-import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.channel.IChannelMultiplexer;
@@ -51,7 +50,6 @@ import org.eclipse.internal.net4j.protocol.ServerProtocolFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -478,27 +476,6 @@ public abstract class Connector extends Container<IChannel> implements IConnecto
return nextChannelID++;
}
- protected List<Queue<IBuffer>> getChannelBufferQueues()
- {
- final List<Queue<IBuffer>> result = new ArrayList<Queue<IBuffer>>(channels.size());
- channelsLock.read(new Runnable()
- {
- public void run()
- {
- for (final InternalChannel channel : channels)
- {
- if (channel != null && channel.isActive())
- {
- Queue<IBuffer> bufferQueue = channel.getSendQueue();
- result.add(bufferQueue);
- }
- }
- }
- });
-
- return result;
- }
-
protected short findFreeChannelIndex()
{
return channelsLock.read(new Callable<Short>()