Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java21
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java1
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java3
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java171
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java22
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java4
7 files changed, 162 insertions, 64 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
index df9a3f790c..676eb9dd3b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
@@ -27,9 +27,11 @@ import org.eclipse.internal.net4j.transport.ChannelImpl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.TimeoutException;
/**
* @author Eike Stepper
@@ -40,10 +42,14 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
private static final ContextTracer TRACER = new ContextTracer(Net4j.DEBUG_CONNECTOR,
AbstractTCPConnector.class);
+ private static final long REGISTER_SELECTOR_TIMEOUT = 250000;
+
private SocketChannel socketChannel;
private TCPSelector selector;
+ private SelectionKey selectionKey;
+
private Buffer inputBuffer;
private ControlChannelImpl controlChannel;
@@ -91,7 +97,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
*/
public void multiplexBuffer(Channel channel)
{
- selector.setWriteInterest(socketChannel, true);
+ selector.setWriteInterest(selectionKey, true);
}
public void handleConnect(TCPSelector selector, SocketChannel channel)
@@ -110,7 +116,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
try
{
- selector.setConnectInterest(socketChannel, false);
+ selector.setConnectInterest(selectionKey, false);
setState(State.NEGOTIATING);
}
catch (Exception ex)
@@ -193,7 +199,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
if (!moreToWrite)
{
- selector.setWriteInterest(socketChannel, false);
+ selector.setWriteInterest(selectionKey, false);
}
}
catch (NullPointerException ignore)
@@ -263,10 +269,15 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
controlChannel = new ControlChannelImpl(this);
controlChannel.activate();
- selector.register(socketChannel, this);
+ selectionKey = selector.register(socketChannel, this, REGISTER_SELECTOR_TIMEOUT);
+ if (selectionKey == null)
+ {
+ throw new TimeoutException("Unable to register channel with selector");
+ }
+
if (getType() == Type.SERVER)
{
- selector.setConnectInterest(socketChannel, false);
+ selector.setConnectInterest(selectionKey, false);
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
index 5716858969..f645629767 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
@@ -12,7 +12,6 @@ package org.eclipse.internal.net4j.transport.tcp;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
/**
* @author Eike Stepper
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
index d4d3ec96b0..2634313ba2 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
@@ -33,7 +33,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
-import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@@ -325,7 +324,7 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(sAddr);
- selector.register(serverSocketChannel, this);
+ selector.registerAsync(serverSocketChannel, this);
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
index 6293ae35ea..b1028469f2 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
@@ -12,6 +12,8 @@ package org.eclipse.internal.net4j.transport.tcp;
import org.eclipse.net4j.transport.tcp.TCPSelector;
import org.eclipse.net4j.transport.tcp.TCPSelectorListener;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Active;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Passive;
import org.eclipse.net4j.util.lifecycle.AbstractLifecycle;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -29,6 +31,8 @@ import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @author Eike Stepper
@@ -50,102 +54,130 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
{
}
- public void invokeLater(Runnable operation)
+ public void invokeAsync(final Runnable operation)
{
pendingOperations.add(operation);
selector.wakeup();
}
- public void register(final ServerSocketChannel channel, final TCPSelectorListener.Passive listener)
+ public void registerAsync(final ServerSocketChannel channel, final Passive listener)
{
- if (listener == null)
+ assertValidListener(listener);
+ invokeAsync(new Runnable()
{
- throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
- }
+ public void run()
+ {
+ doRegister(channel, listener);
+ }
+ });
+ }
- invokeLater(new Runnable()
+ public void registerAsync(final SocketChannel channel, final Active listener)
+ {
+ assertValidListener(listener);
+ invokeAsync(new Runnable()
{
public void run()
{
- if (TRACER.isEnabled())
- {
- TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
- }
-
- try
- {
- channel.register(selector, SelectionKey.OP_ACCEPT, listener);
- }
- catch (ClosedChannelException ignore)
- {
- ;
- }
+ doRegister(channel, listener);
}
});
}
- public void register(final SocketChannel channel, final TCPSelectorListener.Active listener)
+ public boolean invoke(final Runnable operation, long timeout)
{
- if (listener == null)
+ final CountDownLatch latch = new CountDownLatch(1);
+ pendingOperations.add(new Runnable()
{
- throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
+ public void run()
+ {
+ operation.run();
+ latch.countDown();
+ }
+ });
+
+ selector.wakeup();
+
+ try
+ {
+ boolean result = latch.await(timeout, TimeUnit.MILLISECONDS);
+ return result;
+ }
+ catch (InterruptedException ex)
+ {
+ return false;
}
+ }
- invokeLater(new Runnable()
+ public SelectionKey register(final ServerSocketChannel channel, final Passive listener,
+ long timeout)
+ {
+ assertValidListener(listener);
+ boolean success = invoke(new Runnable()
{
public void run()
{
- if (TRACER.isEnabled())
- {
- TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
- }
+ doRegister(channel, listener);
+ }
+ }, timeout);
- try
- {
- channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
- }
- catch (ClosedChannelException ignore)
- {
- ;
- }
+ if (!success)
+ {
+ return null;
+ }
+
+ return channel.keyFor(selector);
+ }
+
+ public SelectionKey register(final SocketChannel channel, final Active listener, long timeout)
+ {
+ assertValidListener(listener);
+ boolean success = invoke(new Runnable()
+ {
+ public void run()
+ {
+ System.out.println("START REGISTER");
+ doRegister(channel, listener);
+ System.out.println("STOP REGISTER");
}
- });
+ }, timeout);
+
+ if (!success)
+ {
+ return null;
+ }
+
+ return channel.keyFor(selector);
}
- public void setConnectInterest(final SocketChannel channel, final boolean on)
+ public void setConnectInterest(final SelectionKey selectionKey, final boolean on)
{
- invokeLater(new Runnable()
+ invokeAsync(new Runnable()
{
public void run()
{
- // TODO Optimize this lookup away
- SelectionKey selectionKey = channel.keyFor(selector);
TCPUtil.setConnectInterest(selectionKey, on);
}
});
}
- public void setReadInterest(final SocketChannel channel, final boolean on)
+ public void setReadInterest(final SelectionKey selectionKey, final boolean on)
{
- invokeLater(new Runnable()
+ invokeAsync(new Runnable()
{
public void run()
{
- // TODO Optimize this lookup away
- SelectionKey selectionKey = channel.keyFor(selector);
TCPUtil.setReadInterest(selectionKey, on);
}
});
}
- public void setWriteInterest(final SocketChannel channel, final boolean on)
+ public void setWriteInterest(final SelectionKey selectionKey, final boolean on)
{
- invokeLater(new Runnable()
+ invokeAsync(new Runnable()
{
public void run()
{
- // TODO Optimize this lookup away
- SelectionKey selectionKey = channel.keyFor(selector);
TCPUtil.setWriteInterest(selectionKey, on);
}
});
@@ -323,4 +355,47 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
throw exception;
}
}
+
+ private void assertValidListener(Object listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
+ }
+ }
+
+ private void doRegister(final ServerSocketChannel channel,
+ final TCPSelectorListener.Passive listener)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
+ }
+
+ try
+ {
+ channel.register(selector, SelectionKey.OP_ACCEPT, listener);
+ }
+ catch (ClosedChannelException ignore)
+ {
+ ;
+ }
+ }
+
+ private void doRegister(final SocketChannel channel, final TCPSelectorListener.Active listener)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
+ }
+
+ try
+ {
+ channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
+ }
+ catch (ClosedChannelException ignore)
+ {
+ ;
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java
index d49d869518..7e35bcec2a 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java
@@ -10,6 +10,10 @@
**************************************************************************/
package org.eclipse.net4j.transport.tcp;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Active;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener.Passive;
+
+import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@@ -18,15 +22,21 @@ import java.nio.channels.SocketChannel;
*/
public interface TCPSelector
{
- public void invokeLater(Runnable operation);
+ public void invokeAsync(Runnable operation);
+
+ public void registerAsync(ServerSocketChannel channel, Passive listener);
+
+ public void registerAsync(SocketChannel channel, Active listener);
+
+ public boolean invoke(Runnable operation, long timeout);
- public void register(ServerSocketChannel channel, TCPSelectorListener.Passive listener);
+ public SelectionKey register(ServerSocketChannel channel, Passive listener, long timeout);
- public void register(SocketChannel channel, TCPSelectorListener.Active listener);
+ public SelectionKey register(SocketChannel channel, Active listener, long timeout);
- public void setConnectInterest(SocketChannel channel, boolean on);
+ public void setConnectInterest(SelectionKey selectionKey, boolean on);
- public void setReadInterest(SocketChannel channel, boolean on);
+ public void setReadInterest(SelectionKey selectionKey, boolean on);
- public void setWriteInterest(SocketChannel channel, boolean on);
+ public void setWriteInterest(SelectionKey selectionKey, boolean on);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java
index 4539855c14..a78a61ba84 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/concurrent/WorkSerializer.java
@@ -10,7 +10,11 @@
**************************************************************************/
package org.eclipse.net4j.util.concurrent;
+import java.util.concurrent.ExecutorCompletionService;
+
/**
+ * TODO Replace by {@link ExecutorCompletionService}?
+ *
* @author Eike Stepper
*/
public interface WorkSerializer
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java
index b9f8ea24cb..dd060146b4 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/util/registry/AbstractCachingRegistry.java
@@ -18,8 +18,8 @@ import java.util.Map;
import java.util.Set;
/**
- * TODO Check if all methods of DelegatingRegistry still do what they should.
- * TODO Remove DelegatingRegistry?
+ * TODO Check if all methods of {@link DelegatingRegistry} still do what they should.
+ * TODO Remove {@link DelegatingRegistry}?
*
* @author Eike Stepper
*/

Back to the top