diff options
Diffstat (limited to 'plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java')
-rw-r--r-- | plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java | 171 |
1 files changed, 123 insertions, 48 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java index 6293ae35ea..b1028469f2 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java @@ -12,6 +12,8 @@ package org.eclipse.internal.net4j.transport.tcp; import org.eclipse.net4j.transport.tcp.TCPSelector; import org.eclipse.net4j.transport.tcp.TCPSelectorListener; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Active; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Passive; import org.eclipse.net4j.util.lifecycle.AbstractLifecycle; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -29,6 +31,8 @@ import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @author Eike Stepper @@ -50,102 +54,130 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R { } - public void invokeLater(Runnable operation) + public void invokeAsync(final Runnable operation) { pendingOperations.add(operation); selector.wakeup(); } - public void register(final ServerSocketChannel channel, final TCPSelectorListener.Passive listener) + public void registerAsync(final ServerSocketChannel channel, final Passive listener) { - if (listener == null) + assertValidListener(listener); + invokeAsync(new Runnable() { - throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ - } + public void run() + { + doRegister(channel, listener); + } + }); + } - invokeLater(new Runnable() + public void registerAsync(final SocketChannel channel, final Active listener) + { + assertValidListener(listener); + invokeAsync(new Runnable() { public void run() { - if (TRACER.isEnabled()) - { - TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ - } - - try - { - channel.register(selector, SelectionKey.OP_ACCEPT, listener); - } - catch (ClosedChannelException ignore) - { - ; - } + doRegister(channel, listener); } }); } - public void register(final SocketChannel channel, final TCPSelectorListener.Active listener) + public boolean invoke(final Runnable operation, long timeout) { - if (listener == null) + final CountDownLatch latch = new CountDownLatch(1); + pendingOperations.add(new Runnable() { - throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ + public void run() + { + operation.run(); + latch.countDown(); + } + }); + + selector.wakeup(); + + try + { + boolean result = latch.await(timeout, TimeUnit.MILLISECONDS); + return result; + } + catch (InterruptedException ex) + { + return false; } + } - invokeLater(new Runnable() + public SelectionKey register(final ServerSocketChannel channel, final Passive listener, + long timeout) + { + assertValidListener(listener); + boolean success = invoke(new Runnable() { public void run() { - if (TRACER.isEnabled()) - { - TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ - } + doRegister(channel, listener); + } + }, timeout); - try - { - channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); - } - catch (ClosedChannelException ignore) - { - ; - } + if (!success) + { + return null; + } + + return channel.keyFor(selector); + } + + public SelectionKey register(final SocketChannel channel, final Active listener, long timeout) + { + assertValidListener(listener); + boolean success = invoke(new Runnable() + { + public void run() + { + System.out.println("START REGISTER"); + doRegister(channel, listener); + System.out.println("STOP REGISTER"); } - }); + }, timeout); + + if (!success) + { + return null; + } + + return channel.keyFor(selector); } - public void setConnectInterest(final SocketChannel channel, final boolean on) + public void setConnectInterest(final SelectionKey selectionKey, final boolean on) { - invokeLater(new Runnable() + invokeAsync(new Runnable() { public void run() { - // TODO Optimize this lookup away - SelectionKey selectionKey = channel.keyFor(selector); TCPUtil.setConnectInterest(selectionKey, on); } }); } - public void setReadInterest(final SocketChannel channel, final boolean on) + public void setReadInterest(final SelectionKey selectionKey, final boolean on) { - invokeLater(new Runnable() + invokeAsync(new Runnable() { public void run() { - // TODO Optimize this lookup away - SelectionKey selectionKey = channel.keyFor(selector); TCPUtil.setReadInterest(selectionKey, on); } }); } - public void setWriteInterest(final SocketChannel channel, final boolean on) + public void setWriteInterest(final SelectionKey selectionKey, final boolean on) { - invokeLater(new Runnable() + invokeAsync(new Runnable() { public void run() { - // TODO Optimize this lookup away - SelectionKey selectionKey = channel.keyFor(selector); TCPUtil.setWriteInterest(selectionKey, on); } }); @@ -323,4 +355,47 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R throw exception; } } + + private void assertValidListener(Object listener) + { + if (listener == null) + { + throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$ + } + } + + private void doRegister(final ServerSocketChannel channel, + final TCPSelectorListener.Passive listener) + { + if (TRACER.isEnabled()) + { + TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ + } + + try + { + channel.register(selector, SelectionKey.OP_ACCEPT, listener); + } + catch (ClosedChannelException ignore) + { + ; + } + } + + private void doRegister(final SocketChannel channel, final TCPSelectorListener.Active listener) + { + if (TRACER.isEnabled()) + { + TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$ + } + + try + { + channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); + } + catch (ClosedChannelException ignore) + { + ; + } + } } |