diff options
6 files changed, 122 insertions, 113 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java index 76350c5eff..df9a3f790c 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java @@ -27,7 +27,6 @@ import org.eclipse.internal.net4j.transport.ChannelImpl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Queue; @@ -45,8 +44,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements private TCPSelector selector; - private SelectionKey selectionKey; - private Buffer inputBuffer; private ControlChannelImpl controlChannel; @@ -87,11 +84,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements return socketChannel; } - public SelectionKey getSelectionKey() - { - return selectionKey; - } - /** * Called by {@link ChannelImpl} each time a new buffer is available for * multiplexing. This or another buffer can be dequeued from the outputQueue @@ -99,7 +91,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements */ public void multiplexBuffer(Channel channel) { - TCPUtil.setWriteInterest(selectionKey, true); + selector.setWriteInterest(socketChannel, true); } public void handleConnect(TCPSelector selector, SocketChannel channel) @@ -118,8 +110,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements try { - TCPUtil.setConnectInterest(selectionKey, false); - TCPUtil.setReadInterest(selectionKey, true); + selector.setConnectInterest(socketChannel, false); setState(State.NEGOTIATING); } catch (Exception ex) @@ -131,13 +122,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements public void handleRead(TCPSelector selector, SocketChannel socketChannel) { - // TODO Is this needed? - if (!socketChannel.isConnected()) - { - deactivate(); - return; - } - try { if (inputBuffer == null) @@ -181,13 +165,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements public void handleWrite(TCPSelector selector, SocketChannel socketChannel) { - // TODO Is this needed? - if (!socketChannel.isConnected()) - { - deactivate(); - return; - } - try { boolean moreToWrite = false; @@ -216,9 +193,13 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements if (!moreToWrite) { - TCPUtil.setWriteInterest(selectionKey, false); + selector.setWriteInterest(socketChannel, false); } } + catch (NullPointerException ignore) + { + ; + } catch (ClosedChannelException ex) { deactivate(); @@ -282,10 +263,10 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements controlChannel = new ControlChannelImpl(this); controlChannel.activate(); - selectionKey = selector.register(socketChannel, this); + selector.register(socketChannel, this); if (getType() == Type.SERVER) { - TCPUtil.setConnectInterest(selectionKey, false); + selector.setConnectInterest(socketChannel, false); } } @@ -296,9 +277,9 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements try { - selectionKey.cancel(); + controlChannel.deactivate(); } - catch (RuntimeException ex) + catch (Exception ex) { if (exception == null) { @@ -307,7 +288,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements } finally { - selectionKey = null; + controlChannel = null; } try @@ -328,22 +309,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements try { - controlChannel.deactivate(); - } - catch (Exception ex) - { - if (exception == null) - { - exception = ex; - } - } - finally - { - controlChannel = null; - } - - try - { super.onDeactivate(); } catch (Exception ex) diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java index 705729320d..5716858969 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java @@ -72,9 +72,6 @@ public class ClientTCPConnectorImpl extends AbstractTCPConnector protected void onActivate() throws Exception { super.onActivate(); - SelectionKey selKey = getSelectionKey(); - selKey.interestOps(selKey.interestOps() | SelectionKey.OP_CONNECT); - InetAddress addr = InetAddress.getByName(host); InetSocketAddress sAddr = new InetSocketAddress(addr, port); getSocketChannel().connect(sAddr); diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java index b283cbfa80..d4d3ec96b0 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java @@ -34,7 +34,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.Channel; import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.SelectionKey; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -65,8 +65,6 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B private ServerSocketChannel serverSocketChannel; - private SelectionKey selectionKey; - private Set<TCPConnector> acceptedConnectors = new HashSet(0); /** @@ -213,7 +211,7 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B addConnector(socketChannel); } } - catch (ClosedByInterruptException ex) + catch (ClosedChannelException ex) { deactivate(); } @@ -327,7 +325,7 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(sAddr); - selectionKey = selector.register(serverSocketChannel, this); + selector.register(serverSocketChannel, this); } @Override @@ -338,43 +336,6 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B LifecycleUtil.deactivate(connector); } - Exception exception = null; - - try - { - selectionKey.cancel(); - } - catch (RuntimeException ex) - { - if (exception == null) - { - exception = ex; - } - } - finally - { - selectionKey = null; - } - - try - { - serverSocketChannel.close(); - } - catch (RuntimeException ex) - { - if (exception == null) - { - exception = ex; - } - } - finally - { - serverSocketChannel = null; - } - - if (exception != null) - { - throw exception; - } + serverSocketChannel.close(); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java index 037a9f1cf0..6293ae35ea 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java @@ -27,6 +27,8 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; /** * @author Eike Stepper @@ -40,42 +42,113 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R private Selector selector; + private Queue<Runnable> pendingOperations = new ConcurrentLinkedQueue(); + private Thread thread; public TCPSelectorImpl() { } - public SelectionKey register(ServerSocketChannel channel, TCPSelectorListener.Passive listener) - throws ClosedChannelException + public void invokeLater(Runnable operation) + { + pendingOperations.add(operation); + selector.wakeup(); + } + + public void register(final ServerSocketChannel channel, final TCPSelectorListener.Passive listener) { if (listener == null) { throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ } - if (TRACER.isEnabled()) + invokeLater(new Runnable() { - TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ - } + public void run() + { + if (TRACER.isEnabled()) + { + TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ + } - return channel.register(selector, SelectionKey.OP_ACCEPT, listener); + try + { + channel.register(selector, SelectionKey.OP_ACCEPT, listener); + } + catch (ClosedChannelException ignore) + { + ; + } + } + }); } - public SelectionKey register(SocketChannel channel, TCPSelectorListener.Active listener) - throws ClosedChannelException + public void register(final SocketChannel channel, final TCPSelectorListener.Active listener) { if (listener == null) { throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ } - if (TRACER.isEnabled()) + invokeLater(new Runnable() { - TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ - } + public void run() + { + if (TRACER.isEnabled()) + { + TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ + } - return channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); + try + { + channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); + } + catch (ClosedChannelException ignore) + { + ; + } + } + }); + } + + public void setConnectInterest(final SocketChannel channel, final boolean on) + { + invokeLater(new Runnable() + { + public void run() + { + // TODO Optimize this lookup away + SelectionKey selectionKey = channel.keyFor(selector); + TCPUtil.setConnectInterest(selectionKey, on); + } + }); + } + + public void setReadInterest(final SocketChannel channel, final boolean on) + { + invokeLater(new Runnable() + { + public void run() + { + // TODO Optimize this lookup away + SelectionKey selectionKey = channel.keyFor(selector); + TCPUtil.setReadInterest(selectionKey, on); + } + }); + } + + public void setWriteInterest(final SocketChannel channel, final boolean on) + { + invokeLater(new Runnable() + { + public void run() + { + // TODO Optimize this lookup away + SelectionKey selectionKey = channel.keyFor(selector); + TCPUtil.setWriteInterest(selectionKey, on); + } + }); } public void run() @@ -90,7 +163,13 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R try { - if (selector.select(SELECT_TIMEOUT) > 0) + Runnable operation; + while ((operation = pendingOperations.poll()) != null) + { + operation.run(); + } + + if (selector.select() > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) @@ -106,6 +185,10 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R { ; // Do nothing } + catch (NullPointerException ignore) + { + ; // Do nothing + } catch (Exception ex) { Net4j.LOG.error(ex); diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java index a12b92982f..af664c0263 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java @@ -92,7 +92,6 @@ public final class TCPUtil } selectionKey.interestOps(newOps); - selectionKey.selector().wakeup(); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java index 80a4df9fb1..d49d869518 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java @@ -10,8 +10,6 @@ **************************************************************************/ package org.eclipse.net4j.transport.tcp; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -20,9 +18,15 @@ import java.nio.channels.SocketChannel; */ public interface TCPSelector { - public SelectionKey register(ServerSocketChannel channel, TCPSelectorListener.Passive listener) - throws ClosedChannelException; + public void invokeLater(Runnable operation); - public SelectionKey register(SocketChannel channel, TCPSelectorListener.Active listener) - throws ClosedChannelException; + public void register(ServerSocketChannel channel, TCPSelectorListener.Passive listener); + + public void register(SocketChannel channel, TCPSelectorListener.Active listener); + + public void setConnectInterest(SocketChannel channel, boolean on); + + public void setReadInterest(SocketChannel channel, boolean on); + + public void setWriteInterest(SocketChannel channel, boolean on); } |