diff options
7 files changed, 162 insertions, 64 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java index df9a3f790c..676eb9dd3b 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java @@ -27,9 +27,11 @@ import org.eclipse.internal.net4j.transport.ChannelImpl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Queue; +import java.util.concurrent.TimeoutException; /** * @author Eike Stepper @@ -40,10 +42,14 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements private static final ContextTracer TRACER = new ContextTracer(Net4j.DEBUG_CONNECTOR, AbstractTCPConnector.class); + private static final long REGISTER_SELECTOR_TIMEOUT = 250000; + private SocketChannel socketChannel; private TCPSelector selector; + private SelectionKey selectionKey; + private Buffer inputBuffer; private ControlChannelImpl controlChannel; @@ -91,7 +97,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements */ public void multiplexBuffer(Channel channel) { - selector.setWriteInterest(socketChannel, true); + selector.setWriteInterest(selectionKey, true); } public void handleConnect(TCPSelector selector, SocketChannel channel) @@ -110,7 +116,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements try { - selector.setConnectInterest(socketChannel, false); + selector.setConnectInterest(selectionKey, false); setState(State.NEGOTIATING); } catch (Exception ex) @@ -193,7 +199,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements if (!moreToWrite) { - selector.setWriteInterest(socketChannel, false); + selector.setWriteInterest(selectionKey, false); } } catch (NullPointerException ignore) @@ -263,10 +269,15 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements controlChannel = new ControlChannelImpl(this); controlChannel.activate(); - selector.register(socketChannel, this); + selectionKey = selector.register(socketChannel, this, REGISTER_SELECTOR_TIMEOUT); + if (selectionKey == null) + { + throw new TimeoutException("Unable to register channel with selector"); + } + if (getType() == Type.SERVER) { - selector.setConnectInterest(socketChannel, false); + selector.setConnectInterest(selectionKey, false); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java index 5716858969..f645629767 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java @@ -12,7 +12,6 @@ package org.eclipse.internal.net4j.transport.tcp; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; /** * @author Eike Stepper diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java index d4d3ec96b0..2634313ba2 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.Channel; -import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -325,7 +324,7 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(sAddr); - selector.register(serverSocketChannel, this); + selector.registerAsync(serverSocketChannel, this); } @Override diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java index 6293ae35ea..b1028469f2 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java @@ -12,6 +12,8 @@ package org.eclipse.internal.net4j.transport.tcp; import org.eclipse.net4j.transport.tcp.TCPSelector; import org.eclipse.net4j.transport.tcp.TCPSelectorListener; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Active; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Passive; import org.eclipse.net4j.util.lifecycle.AbstractLifecycle; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -29,6 +31,8 @@ import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @author Eike Stepper @@ -50,102 +54,130 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R { } - public void invokeLater(Runnable operation) + public void invokeAsync(final Runnable operation) { pendingOperations.add(operation); selector.wakeup(); } - public void register(final ServerSocketChannel channel, final TCPSelectorListener.Passive listener) + public void registerAsync(final ServerSocketChannel channel, final Passive listener) { - if (listener == null) + assertValidListener(listener); + invokeAsync(new Runnable() { - throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ - } + public void run() + { + doRegister(channel, listener); + } + }); + } - invokeLater(new Runnable() + public void registerAsync(final SocketChannel channel, final Active listener) + { + assertValidListener(listener); + invokeAsync(new Runnable() { public void run() { - if (TRACER.isEnabled()) - { - TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ - } - - try - { - channel.register(selector, SelectionKey.OP_ACCEPT, listener); - } - catch (ClosedChannelException ignore) - { - ; - } + doRegister(channel, listener); } }); } - public void register(final SocketChannel channel, final TCPSelectorListener.Active listener) + public boolean invoke(final Runnable operation, long timeout) { - if (listener == null) + final CountDownLatch latch = new CountDownLatch(1); + pendingOperations.add(new Runnable() { - throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ + public void run() + { + operation.run(); + latch.countDown(); + } + }); + + selector.wakeup(); + + try + { + boolean result = latch.await(timeout, TimeUnit.MILLISECONDS); + return result; + } + catch (InterruptedException ex) + { + return false; } + } - invokeLater(new Runnable() + public SelectionKey register(final ServerSocketChannel channel, final Passive listener, + long timeout) + { + assertValidListener(listener); + boolean success = invoke(new Runnable() { public void run() { - if (TRACER.isEnabled()) - { - TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ - } + doRegister(channel, listener); + } + }, timeout); - try - { - channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); - } - catch (ClosedChannelException ignore) - { - ; - } + if (!success) + { + return null; + } + + return channel.keyFor(selector); + } + + public SelectionKey register(final SocketChannel channel, final Active listener, long timeout) + { + assertValidListener(listener); + boolean success = invoke(new Runnable() + { + public void run() + { + System.out.println("START REGISTER"); + doRegister(channel, listener); + System.out.println("STOP REGISTER"); } - }); + }, timeout); + + if (!success) + { + return null; + } + + return channel.keyFor(selector); } - public void setConnectInterest(final SocketChannel channel, final boolean on) + public void setConnectInterest(final SelectionKey selectionKey, final boolean on) { - invokeLater(new Runnable() + invokeAsync(new Runnable() { public void run() { - // TODO Optimize this lookup away - SelectionKey selectionKey = channel.keyFor(selector); TCPUtil.setConnectInterest(selectionKey, on); } }); } - public void setReadInterest(final SocketChannel channel, final boolean on) + public void setReadInterest(final SelectionKey selectionKey, final boolean on) { - invokeLater(new Runnable() + invokeAsync(new Runnable() { public void run() { - // TODO Optimize this lookup away - SelectionKey selectionKey = channel.keyFor(selector); TCPUtil.setReadInterest(selectionKey, on); } }); } - public void setWriteInterest(final SocketChannel channel, final boolean on) + public void setWriteInterest(final SelectionKey selectionKey, final boolean on) { - invokeLater(new Runnable() + invokeAsync(new Runnable() { public void run() { - // TODO Optimize this lookup away - SelectionKey selectionKey = channel.keyFor(selector); TCPUtil.setWriteInterest(selectionKey, on); } }); @@ -323,4 +355,47 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R throw exception; } } + + private void assertValidListener(Object listener) + { + if (listener == null) + { + throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ + } + } + + private void doRegister(final ServerSocketChannel channel, + final TCPSelectorListener.Passive listener) + { + if (TRACER.isEnabled()) + { + TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ + } + + try + { + channel.register(selector, SelectionKey.OP_ACCEPT, listener); + } + catch (ClosedChannelException ignore) + { + ; + } + } + + private void doRegister(final SocketChannel channel, final TCPSelectorListener.Active listener) + { + if (TRACER.isEnabled()) + { + TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ + } + + try + { + channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); + } + catch (ClosedChannelException ignore) + { + ; + } + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java index d49d869518..7e35bcec2a 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java @@ -10,6 +10,10 @@ **************************************************************************/ package org.eclipse.net4j.transport.tcp; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Active; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Passive; + +import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -18,15 +22,21 @@ import java.nio.channels.SocketChannel; */ public interface TCPSelector { - public void invokeLater(Runnable operation); + public void invokeAsync(Runnable operation); + + public void registerAsync(ServerSocketChannel channel, Passive listener); + + public void registerAsync(SocketChannel channel, Active listener); + + public boolean invoke(Runnable operation, long timeout); - public void register(ServerSocketChannel channel, TCPSelectorListener.Passive listener); + public SelectionKey register(ServerSocketChannel channel, Passive listener, long timeout); - public void register(SocketChannel channel, TCPSelectorListener.Active listener); + public SelectionKey register(SocketChannel channel, Active listener, long timeout); - public void setConnectInterest(SocketChannel channel, boolean on); + public void setConnectInterest(SelectionKey selectionKey, boolean on); - public void setReadInterest(SocketChannel channel, boolean on); + public void setReadInterest(SelectionKey selectionKey, boolean on); - public void setWriteInterest(SocketChannel channel, boolean on); + public void setWriteInterest(SelectionKey selectionKey, boolean on); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java index 4539855c14..a78a61ba84 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java @@ -10,7 +10,11 @@ **************************************************************************/ package org.eclipse.net4j.util.concurrent; +import java.util.concurrent.ExecutorCompletionService; + /** + * TODO Replace by {@link ExecutorCompletionService}? + * * @author Eike Stepper */ public interface WorkSerializer diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java index b9f8ea24cb..dd060146b4 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java @@ -18,8 +18,8 @@ import java.util.Map; import java.util.Set; /** - * TODO Check if all methods of DelegatingRegistry still do what they should. - * TODO Remove DelegatingRegistry? + * TODO Check if all methods of {@link DelegatingRegistry} still do what they should. + * TODO Remove {@link DelegatingRegistry}? * * @author Eike Stepper */ |