Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java171
1 files changed, 123 insertions, 48 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
index 6293ae35ea..b1028469f2 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
@@ -12,6 +12,8 @@ package org.eclipse.internal.net4j.transport.tcp;
import org.eclipse.net4j.transport.tcp.TCPSelector;
import org.eclipse.net4j.transport.tcp.TCPSelectorListener;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Active;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Passive;
import org.eclipse.net4j.util.lifecycle.AbstractLifecycle;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -29,6 +31,8 @@ import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @author Eike Stepper
@@ -50,102 +54,130 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
{
}
- public void invokeLater(Runnable operation)
+ public void invokeAsync(final Runnable operation)
{
pendingOperations.add(operation);
selector.wakeup();
}
- public void register(final ServerSocketChannel channel, final TCPSelectorListener.Passive listener)
+ public void registerAsync(final ServerSocketChannel channel, final Passive listener)
{
- if (listener == null)
+ assertValidListener(listener);
+ invokeAsync(new Runnable()
{
- throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
- }
+ public void run()
+ {
+ doRegister(channel, listener);
+ }
+ });
+ }
- invokeLater(new Runnable()
+ public void registerAsync(final SocketChannel channel, final Active listener)
+ {
+ assertValidListener(listener);
+ invokeAsync(new Runnable()
{
public void run()
{
- if (TRACER.isEnabled())
- {
- TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
- }
-
- try
- {
- channel.register(selector, SelectionKey.OP_ACCEPT, listener);
- }
- catch (ClosedChannelException ignore)
- {
- ;
- }
+ doRegister(channel, listener);
}
});
}
- public void register(final SocketChannel channel, final TCPSelectorListener.Active listener)
+ public boolean invoke(final Runnable operation, long timeout)
{
- if (listener == null)
+ final CountDownLatch latch = new CountDownLatch(1);
+ pendingOperations.add(new Runnable()
{
- throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
+ public void run()
+ {
+ operation.run();
+ latch.countDown();
+ }
+ });
+
+ selector.wakeup();
+
+ try
+ {
+ boolean result = latch.await(timeout, TimeUnit.MILLISECONDS);
+ return result;
+ }
+ catch (InterruptedException ex)
+ {
+ return false;
}
+ }
- invokeLater(new Runnable()
+ public SelectionKey register(final ServerSocketChannel channel, final Passive listener,
+ long timeout)
+ {
+ assertValidListener(listener);
+ boolean success = invoke(new Runnable()
{
public void run()
{
- if (TRACER.isEnabled())
- {
- TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
- }
+ doRegister(channel, listener);
+ }
+ }, timeout);
- try
- {
- channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
- }
- catch (ClosedChannelException ignore)
- {
- ;
- }
+ if (!success)
+ {
+ return null;
+ }
+
+ return channel.keyFor(selector);
+ }
+
+ public SelectionKey register(final SocketChannel channel, final Active listener, long timeout)
+ {
+ assertValidListener(listener);
+ boolean success = invoke(new Runnable()
+ {
+ public void run()
+ {
+ System.out.println("START REGISTER");
+ doRegister(channel, listener);
+ System.out.println("STOP REGISTER");
}
- });
+ }, timeout);
+
+ if (!success)
+ {
+ return null;
+ }
+
+ return channel.keyFor(selector);
}
- public void setConnectInterest(final SocketChannel channel, final boolean on)
+ public void setConnectInterest(final SelectionKey selectionKey, final boolean on)
{
- invokeLater(new Runnable()
+ invokeAsync(new Runnable()
{
public void run()
{
- // TODO Optimize this lookup away
- SelectionKey selectionKey = channel.keyFor(selector);
TCPUtil.setConnectInterest(selectionKey, on);
}
});
}
- public void setReadInterest(final SocketChannel channel, final boolean on)
+ public void setReadInterest(final SelectionKey selectionKey, final boolean on)
{
- invokeLater(new Runnable()
+ invokeAsync(new Runnable()
{
public void run()
{
- // TODO Optimize this lookup away
- SelectionKey selectionKey = channel.keyFor(selector);
TCPUtil.setReadInterest(selectionKey, on);
}
});
}
- public void setWriteInterest(final SocketChannel channel, final boolean on)
+ public void setWriteInterest(final SelectionKey selectionKey, final boolean on)
{
- invokeLater(new Runnable()
+ invokeAsync(new Runnable()
{
public void run()
{
- // TODO Optimize this lookup away
- SelectionKey selectionKey = channel.keyFor(selector);
TCPUtil.setWriteInterest(selectionKey, on);
}
});
@@ -323,4 +355,47 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
throw exception;
}
}
+
+ private void assertValidListener(Object listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
+ }
+ }
+
+ private void doRegister(final ServerSocketChannel channel,
+ final TCPSelectorListener.Passive listener)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
+ }
+
+ try
+ {
+ channel.register(selector, SelectionKey.OP_ACCEPT, listener);
+ }
+ catch (ClosedChannelException ignore)
+ {
+ ;
+ }
+ }
+
+ private void doRegister(final SocketChannel channel, final TCPSelectorListener.Active listener)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
+ }
+
+ try
+ {
+ channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
+ }
+ catch (ClosedChannelException ignore)
+ {
+ ;
+ }
+ }
}

Back to the top