Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2006-10-31 12:19:07 +0000
committerEike Stepper2006-10-31 12:19:07 +0000
commitb0b27564edd824367b4051e645b3337578aa430f (patch)
tree9bcb7c69090d101dc11503be7625a0ef9b59225d /plugins/org.eclipse.net4j
parentbdabfd099b427286ba1643241ea4649186880c4d (diff)
downloadcdo-b0b27564edd824367b4051e645b3337578aa430f.tar.gz
cdo-b0b27564edd824367b4051e645b3337578aa430f.tar.xz
cdo-b0b27564edd824367b4051e645b3337578aa430f.zip
Reduced CDP load by making selector ops async and the select call blocking indefinetely
Diffstat (limited to 'plugins/org.eclipse.net4j')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java59
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java3
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java47
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java109
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java1
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java16
6 files changed, 122 insertions, 113 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
index 76350c5eff..df9a3f790c 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
@@ -27,7 +27,6 @@ import org.eclipse.internal.net4j.transport.ChannelImpl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Queue;
@@ -45,8 +44,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
private TCPSelector selector;
- private SelectionKey selectionKey;
-
private Buffer inputBuffer;
private ControlChannelImpl controlChannel;
@@ -87,11 +84,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
return socketChannel;
}
- public SelectionKey getSelectionKey()
- {
- return selectionKey;
- }
-
/**
* Called by {@link ChannelImpl} each time a new buffer is available for
* multiplexing. This or another buffer can be dequeued from the outputQueue
@@ -99,7 +91,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
*/
public void multiplexBuffer(Channel channel)
{
- TCPUtil.setWriteInterest(selectionKey, true);
+ selector.setWriteInterest(socketChannel, true);
}
public void handleConnect(TCPSelector selector, SocketChannel channel)
@@ -118,8 +110,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
try
{
- TCPUtil.setConnectInterest(selectionKey, false);
- TCPUtil.setReadInterest(selectionKey, true);
+ selector.setConnectInterest(socketChannel, false);
setState(State.NEGOTIATING);
}
catch (Exception ex)
@@ -131,13 +122,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
public void handleRead(TCPSelector selector, SocketChannel socketChannel)
{
- // TODO Is this needed?
- if (!socketChannel.isConnected())
- {
- deactivate();
- return;
- }
-
try
{
if (inputBuffer == null)
@@ -181,13 +165,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
public void handleWrite(TCPSelector selector, SocketChannel socketChannel)
{
- // TODO Is this needed?
- if (!socketChannel.isConnected())
- {
- deactivate();
- return;
- }
-
try
{
boolean moreToWrite = false;
@@ -216,9 +193,13 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
if (!moreToWrite)
{
- TCPUtil.setWriteInterest(selectionKey, false);
+ selector.setWriteInterest(socketChannel, false);
}
}
+ catch (NullPointerException ignore)
+ {
+ ;
+ }
catch (ClosedChannelException ex)
{
deactivate();
@@ -282,10 +263,10 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
controlChannel = new ControlChannelImpl(this);
controlChannel.activate();
- selectionKey = selector.register(socketChannel, this);
+ selector.register(socketChannel, this);
if (getType() == Type.SERVER)
{
- TCPUtil.setConnectInterest(selectionKey, false);
+ selector.setConnectInterest(socketChannel, false);
}
}
@@ -296,9 +277,9 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
try
{
- selectionKey.cancel();
+ controlChannel.deactivate();
}
- catch (RuntimeException ex)
+ catch (Exception ex)
{
if (exception == null)
{
@@ -307,7 +288,7 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
}
finally
{
- selectionKey = null;
+ controlChannel = null;
}
try
@@ -328,22 +309,6 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
try
{
- controlChannel.deactivate();
- }
- catch (Exception ex)
- {
- if (exception == null)
- {
- exception = ex;
- }
- }
- finally
- {
- controlChannel = null;
- }
-
- try
- {
super.onDeactivate();
}
catch (Exception ex)
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
index 705729320d..5716858969 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
@@ -72,9 +72,6 @@ public class ClientTCPConnectorImpl extends AbstractTCPConnector
protected void onActivate() throws Exception
{
super.onActivate();
- SelectionKey selKey = getSelectionKey();
- selKey.interestOps(selKey.interestOps() | SelectionKey.OP_CONNECT);
-
InetAddress addr = InetAddress.getByName(host);
InetSocketAddress sAddr = new InetSocketAddress(addr, port);
getSocketChannel().connect(sAddr);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
index b283cbfa80..d4d3ec96b0 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
@@ -34,7 +34,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SelectionKey;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
@@ -65,8 +65,6 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B
private ServerSocketChannel serverSocketChannel;
- private SelectionKey selectionKey;
-
private Set<TCPConnector> acceptedConnectors = new HashSet(0);
/**
@@ -213,7 +211,7 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B
addConnector(socketChannel);
}
}
- catch (ClosedByInterruptException ex)
+ catch (ClosedChannelException ex)
{
deactivate();
}
@@ -327,7 +325,7 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(sAddr);
- selectionKey = selector.register(serverSocketChannel, this);
+ selector.register(serverSocketChannel, this);
}
@Override
@@ -338,43 +336,6 @@ public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, B
LifecycleUtil.deactivate(connector);
}
- Exception exception = null;
-
- try
- {
- selectionKey.cancel();
- }
- catch (RuntimeException ex)
- {
- if (exception == null)
- {
- exception = ex;
- }
- }
- finally
- {
- selectionKey = null;
- }
-
- try
- {
- serverSocketChannel.close();
- }
- catch (RuntimeException ex)
- {
- if (exception == null)
- {
- exception = ex;
- }
- }
- finally
- {
- serverSocketChannel = null;
- }
-
- if (exception != null)
- {
- throw exception;
- }
+ serverSocketChannel.close();
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
index 037a9f1cf0..6293ae35ea 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
@@ -27,6 +27,8 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author Eike Stepper
@@ -40,42 +42,113 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
private Selector selector;
+ private Queue<Runnable> pendingOperations = new ConcurrentLinkedQueue();
+
private Thread thread;
public TCPSelectorImpl()
{
}
- public SelectionKey register(ServerSocketChannel channel, TCPSelectorListener.Passive listener)
- throws ClosedChannelException
+ public void invokeLater(Runnable operation)
+ {
+ pendingOperations.add(operation);
+ selector.wakeup();
+ }
+
+ public void register(final ServerSocketChannel channel, final TCPSelectorListener.Passive listener)
{
if (listener == null)
{
throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
}
- if (TRACER.isEnabled())
+ invokeLater(new Runnable()
{
- TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
- }
+ public void run()
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
+ }
- return channel.register(selector, SelectionKey.OP_ACCEPT, listener);
+ try
+ {
+ channel.register(selector, SelectionKey.OP_ACCEPT, listener);
+ }
+ catch (ClosedChannelException ignore)
+ {
+ ;
+ }
+ }
+ });
}
- public SelectionKey register(SocketChannel channel, TCPSelectorListener.Active listener)
- throws ClosedChannelException
+ public void register(final SocketChannel channel, final TCPSelectorListener.Active listener)
{
if (listener == null)
{
throw new IllegalArgumentException("listener == null"); //$NON-NLS-1$
}
- if (TRACER.isEnabled())
+ invokeLater(new Runnable()
{
- TRACER.trace(this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
- }
+ public void run()
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(TCPSelectorImpl.this, "Registering " + TCPUtil.toString(channel)); //$NON-NLS-1$
+ }
- return channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
+ try
+ {
+ channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
+ }
+ catch (ClosedChannelException ignore)
+ {
+ ;
+ }
+ }
+ });
+ }
+
+ public void setConnectInterest(final SocketChannel channel, final boolean on)
+ {
+ invokeLater(new Runnable()
+ {
+ public void run()
+ {
+ // TODO Optimize this lookup away
+ SelectionKey selectionKey = channel.keyFor(selector);
+ TCPUtil.setConnectInterest(selectionKey, on);
+ }
+ });
+ }
+
+ public void setReadInterest(final SocketChannel channel, final boolean on)
+ {
+ invokeLater(new Runnable()
+ {
+ public void run()
+ {
+ // TODO Optimize this lookup away
+ SelectionKey selectionKey = channel.keyFor(selector);
+ TCPUtil.setReadInterest(selectionKey, on);
+ }
+ });
+ }
+
+ public void setWriteInterest(final SocketChannel channel, final boolean on)
+ {
+ invokeLater(new Runnable()
+ {
+ public void run()
+ {
+ // TODO Optimize this lookup away
+ SelectionKey selectionKey = channel.keyFor(selector);
+ TCPUtil.setWriteInterest(selectionKey, on);
+ }
+ });
}
public void run()
@@ -90,7 +163,13 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
try
{
- if (selector.select(SELECT_TIMEOUT) > 0)
+ Runnable operation;
+ while ((operation = pendingOperations.poll()) != null)
+ {
+ operation.run();
+ }
+
+ if (selector.select() > 0)
{
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext())
@@ -106,6 +185,10 @@ public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, R
{
; // Do nothing
}
+ catch (NullPointerException ignore)
+ {
+ ; // Do nothing
+ }
catch (Exception ex)
{
Net4j.LOG.error(ex);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java
index a12b92982f..af664c0263 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java
@@ -92,7 +92,6 @@ public final class TCPUtil
}
selectionKey.interestOps(newOps);
- selectionKey.selector().wakeup();
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java
index 80a4df9fb1..d49d869518 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/tcp/TCPSelector.java
@@ -10,8 +10,6 @@
**************************************************************************/
package org.eclipse.net4j.transport.tcp;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@@ -20,9 +18,15 @@ import java.nio.channels.SocketChannel;
*/
public interface TCPSelector
{
- public SelectionKey register(ServerSocketChannel channel, TCPSelectorListener.Passive listener)
- throws ClosedChannelException;
+ public void invokeLater(Runnable operation);
- public SelectionKey register(SocketChannel channel, TCPSelectorListener.Active listener)
- throws ClosedChannelException;
+ public void register(ServerSocketChannel channel, TCPSelectorListener.Passive listener);
+
+ public void register(SocketChannel channel, TCPSelectorListener.Active listener);
+
+ public void setConnectInterest(SocketChannel channel, boolean on);
+
+ public void setReadInterest(SocketChannel channel, boolean on);
+
+ public void setWriteInterest(SocketChannel channel, boolean on);
}

Back to the top