Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java352
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java82
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java152
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java72
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java371
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java212
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java161
7 files changed, 1402 insertions, 0 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
new file mode 100644
index 0000000000..3724e0025e
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
@@ -0,0 +1,352 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.transport.tcp;
+
+import org.eclipse.net4j.transport.Buffer;
+import org.eclipse.net4j.transport.Channel;
+import org.eclipse.net4j.transport.ConnectorException;
+import org.eclipse.net4j.transport.tcp.TCPConnector;
+import org.eclipse.net4j.transport.tcp.TCPSelector;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+
+import org.eclipse.internal.net4j.transport.AbstractConnector;
+import org.eclipse.internal.net4j.transport.ChannelImpl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @author Eike Stepper
+ */
+public abstract class AbstractTCPConnector extends AbstractConnector implements TCPConnector,
+ TCPSelectorListener.Active
+{
+ private SocketChannel socketChannel;
+
+ private TCPSelector selector;
+
+ private SelectionKey selectionKey;
+
+ private Buffer inputBuffer;
+
+ private ControlChannelImpl controlChannel;
+
+ public AbstractTCPConnector()
+ {
+ try
+ {
+ socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
+ }
+ catch (IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ /**
+ * SocketChannel must already be non-blocking!
+ */
+ public AbstractTCPConnector(SocketChannel socketChannel)
+ {
+ this.socketChannel = socketChannel;
+ }
+
+ public TCPSelector getSelector()
+ {
+ return selector;
+ }
+
+ public void setSelector(TCPSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return socketChannel;
+ }
+
+ public SelectionKey getSelectionKey()
+ {
+ return selectionKey;
+ }
+
+ /**
+ * Called by {@link ChannelImpl} each time a new buffer is available for
+ * multiplexing. This or another buffer can be dequeued from the outputQueue
+ * of the {@link ChannelImpl}.
+ */
+ public void multiplexBuffer(Channel channel)
+ {
+ TCPUtil.setWriteInterest(selectionKey, true);
+ }
+
+ public void handleConnect(TCPSelector selector, SocketChannel channel)
+ {
+ try
+ {
+ if (!channel.finishConnect())
+ {
+ return;
+ }
+ }
+ catch (Exception ex)
+ {
+ return;
+ }
+
+ try
+ {
+ TCPUtil.setConnectInterest(selectionKey, false);
+ TCPUtil.setReadInterest(selectionKey, true);
+ setState(State.NEGOTIATING);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ deactivate();
+ }
+ }
+
+ public void handleRead(TCPSelector selector, SocketChannel socketChannel)
+ {
+ // TODO Is this needed?
+ if (!socketChannel.isConnected())
+ {
+ deactivate();
+ return;
+ }
+
+ try
+ {
+ if (inputBuffer == null)
+ {
+ inputBuffer = getBufferProvider().provideBuffer();
+ }
+
+ ByteBuffer byteBuffer = inputBuffer.startGetting(socketChannel);
+ if (byteBuffer != null)
+ {
+ short channelID = inputBuffer.getChannelID();
+ ChannelImpl channel = channelID == ControlChannelImpl.CONTROL_CHANNEL_ID ? controlChannel
+ : getChannel(channelID);
+ if (channel != null)
+ {
+ channel.handleBufferFromMultiplexer(inputBuffer);
+ }
+ else
+ {
+ System.out.println(toString() + ": Discarding buffer from unknown channel");
+ inputBuffer.release();
+ }
+
+ inputBuffer = null;
+ }
+ }
+ catch (ClosedChannelException ex)
+ {
+ deactivate();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ deactivate();
+ }
+ }
+
+ public void handleWrite(TCPSelector selector, SocketChannel socketChannel)
+ {
+ // TODO Is this needed?
+ if (!socketChannel.isConnected())
+ {
+ deactivate();
+ return;
+ }
+
+ try
+ {
+ boolean moreToWrite = false;
+ for (Queue<Buffer> bufferQueue : getChannelBufferQueues())
+ {
+ Buffer buffer = bufferQueue.peek();
+ if (buffer != null)
+ {
+ if (buffer.write(socketChannel))
+ {
+ bufferQueue.poll();
+ buffer.release();
+
+ if (!moreToWrite)
+ {
+ moreToWrite = !bufferQueue.isEmpty();
+ }
+ }
+ else
+ {
+ moreToWrite = true;
+ break;
+ }
+ }
+ }
+
+ if (!moreToWrite)
+ {
+ TCPUtil.setWriteInterest(selectionKey, false);
+ }
+ }
+ catch (ClosedChannelException ex)
+ {
+ deactivate();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ deactivate();
+ }
+ }
+
+ @Override
+ protected List<Queue<Buffer>> getChannelBufferQueues()
+ {
+ List<Queue<Buffer>> queues = super.getChannelBufferQueues();
+ Queue<Buffer> controlQueue = controlChannel.getSendQueue();
+ if (!controlQueue.isEmpty())
+ {
+ queues.add(controlQueue);
+ }
+
+ return queues;
+ }
+
+ @Override
+ protected void registerChannelWithPeer(short channelID, String protocolID)
+ throws ConnectorException
+ {
+ if (!controlChannel.registerChannel(channelID, protocolID))
+ {
+ throw new ConnectorException("Failed to register channel with peer");
+ }
+ }
+
+ @Override
+ protected void removeChannel(ChannelImpl channel)
+ {
+ if (isConnected())
+ {
+ controlChannel.deregisterChannel(channel.getChannelID());
+ }
+
+ super.removeChannel(channel);
+ }
+
+ @Override
+ protected void onAccessBeforeActivate() throws Exception
+ {
+ super.onAccessBeforeActivate();
+ if (selector == null)
+ {
+ selector = TCPUtil.createTCPSelector();
+ LifecycleUtil.activate(selector);
+ }
+ }
+
+ @Override
+ protected void onActivate() throws Exception
+ {
+ super.onActivate();
+ controlChannel = new ControlChannelImpl(this);
+ controlChannel.activate();
+
+ selectionKey = selector.register(socketChannel, this);
+ if (getType() == Type.SERVER)
+ {
+ TCPUtil.setConnectInterest(selectionKey, false);
+ }
+ }
+
+ @Override
+ protected void onDeactivate() throws Exception
+ {
+ Exception exception = null;
+
+ try
+ {
+ selectionKey.cancel();
+ }
+ catch (RuntimeException ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ selectionKey = null;
+ }
+
+ try
+ {
+ socketChannel.close();
+ }
+ catch (Exception ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ socketChannel = null;
+ }
+
+ try
+ {
+ controlChannel.deactivate();
+ }
+ catch (Exception ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ controlChannel = null;
+ }
+
+ try
+ {
+ super.onDeactivate();
+ }
+ catch (Exception ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+
+ if (exception != null)
+ {
+ throw exception;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
new file mode 100644
index 0000000000..769ab52787
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java
@@ -0,0 +1,82 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.transport.tcp;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+
+/**
+ * @author Eike Stepper
+ */
+public class ClientTCPConnectorImpl extends AbstractTCPConnector
+{
+ private String host;
+
+ private int port = DEFAULT_PORT;
+
+ public ClientTCPConnectorImpl()
+ {
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ public Type getType()
+ {
+ return Type.CLIENT;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientTCPConnector[" + host + ":" + port + "]";
+ }
+
+ @Override
+ protected void onAccessBeforeActivate() throws Exception
+ {
+ super.onAccessBeforeActivate();
+ if (host == null || host.length() == 0)
+ {
+ throw new IllegalArgumentException("host == null || host.length() == 0");
+ }
+ }
+
+ @Override
+ protected void onActivate() throws Exception
+ {
+ super.onActivate();
+ SelectionKey selKey = getSelectionKey();
+ selKey.interestOps(selKey.interestOps() | SelectionKey.OP_CONNECT);
+
+ InetAddress addr = InetAddress.getByName(host);
+ InetSocketAddress sAddr = new InetSocketAddress(addr, port);
+ getSocketChannel().connect(sAddr);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java
new file mode 100644
index 0000000000..9a9e920d85
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java
@@ -0,0 +1,152 @@
+package org.eclipse.internal.net4j.transport.tcp;
+
+import org.eclipse.net4j.transport.Buffer;
+import org.eclipse.net4j.util.concurrent.Synchronizer;
+import org.eclipse.net4j.util.concurrent.SynchronizingCorrelator;
+
+import org.eclipse.internal.net4j.transport.BufferUtil;
+import org.eclipse.internal.net4j.transport.ChannelImpl;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author Eike Stepper
+ */
+public final class ControlChannelImpl extends ChannelImpl
+{
+ public static final short CONTROL_CHANNEL_ID = -1;
+
+ public static final long REGISTRATION_TIMEOUT = 500000;
+
+ public static final byte OPCODE_REGISTRATION = 1;
+
+ public static final byte OPCODE_REGISTRATION_ACK = 2;
+
+ public static final byte OPCODE_DEREGISTRATION = 3;
+
+ public static final byte SUCCESS = 1;
+
+ public static final byte FAILURE = 0;
+
+ private SynchronizingCorrelator<Short, Boolean> registrations = new SynchronizingCorrelator();
+
+ public ControlChannelImpl(AbstractTCPConnector connector)
+ {
+ super(connector.getReceiveExecutor());
+ setChannelID(CONTROL_CHANNEL_ID);
+ setConnector(connector);
+ }
+
+ public boolean registerChannel(short channelID, String protocolID)
+ {
+ assertValidChannelID(channelID);
+ Synchronizer<Boolean> registration = registrations.correlate(channelID);
+
+ Buffer buffer = provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID);
+ byteBuffer.put(OPCODE_REGISTRATION);
+ byteBuffer.putShort(channelID);
+ BufferUtil.putUTF8(byteBuffer, protocolID);
+ handleBuffer(buffer);
+
+ return registration.get(REGISTRATION_TIMEOUT);
+ }
+
+ public void deregisterChannel(short channelID)
+ {
+ assertValidChannelID(channelID);
+
+ Buffer buffer = provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID);
+ byteBuffer.put(OPCODE_DEREGISTRATION);
+ byteBuffer.putShort(channelID);
+ handleBuffer(buffer);
+ }
+
+ private void assertValidChannelID(short channelID)
+ {
+ if (channelID <= CONTROL_CHANNEL_ID)
+ {
+ throw new IllegalArgumentException("channelID <= CONTROL_CHANNEL_ID");
+ }
+ }
+
+ public void handleBufferFromMultiplexer(Buffer buffer)
+ {
+ try
+ {
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ byte opcode = byteBuffer.get();
+ System.out.println("CONTROL: " + opcode);
+ switch (opcode)
+ {
+ case OPCODE_REGISTRATION:
+ {
+ short channelID = byteBuffer.getShort();
+ assertValidChannelID(channelID);
+ boolean success = true;
+
+ try
+ {
+ byte[] handlerFactoryUTF8 = BufferUtil.getByteArray(byteBuffer);
+ String protocolID = BufferUtil.fromUTF8(handlerFactoryUTF8);
+ ChannelImpl channel = ((AbstractTCPConnector)getConnector()).createChannel(channelID,
+ protocolID);
+ if (channel != null)
+ {
+ channel.activate();
+ }
+ else
+ {
+ success = false;
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ success = false;
+ }
+
+ sendStatus(OPCODE_REGISTRATION_ACK, channelID, success);
+ break;
+ }
+
+ case OPCODE_REGISTRATION_ACK:
+ {
+ short channelID = byteBuffer.getShort();
+ boolean success = byteBuffer.get() == SUCCESS;
+ registrations.put(channelID, success);
+ break;
+ }
+
+ case OPCODE_DEREGISTRATION:
+ throw new UnsupportedOperationException();
+
+ default:
+ System.out.println("Invalid opcode: " + opcode);
+ ((AbstractTCPConnector)getConnector()).deactivate();
+ break;
+ }
+ }
+ finally
+ {
+ buffer.release();
+ }
+ }
+
+ // private Buffer getBuffer()
+ // {
+ // return
+ // ((AbstractTCPConnector)getConnector()).getBufferProvider().provideBuffer();
+ // }
+
+ private void sendStatus(byte opcode, short channelID, boolean status)
+ {
+ Buffer buffer = provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID);
+ byteBuffer.put(opcode);
+ byteBuffer.putShort(channelID);
+ byteBuffer.put(status ? SUCCESS : FAILURE);
+ handleBuffer(buffer);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java
new file mode 100644
index 0000000000..f79e8aeb6f
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java
@@ -0,0 +1,72 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.transport.tcp;
+
+import org.eclipse.net4j.transport.BufferProvider;
+import org.eclipse.net4j.transport.ProtocolFactory;
+import org.eclipse.net4j.transport.tcp.TCPSelector;
+import org.eclipse.net4j.util.registry.IRegistry;
+
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Eike Stepper
+ */
+public class ServerTCPConnectorImpl extends AbstractTCPConnector
+{
+ public ServerTCPConnectorImpl(SocketChannel socketChannel, ExecutorService receiveExecutor,
+ IRegistry<String, ProtocolFactory> protocolFactoryRegistry, BufferProvider bufferProvider,
+ TCPSelector selector)
+ {
+ super(socketChannel);
+ setReceiveExecutor(receiveExecutor);
+ setProtocolFactoryRegistry(protocolFactoryRegistry);
+ setBufferProvider(bufferProvider);
+ setSelector(selector);
+ }
+
+ public Type getType()
+ {
+ return Type.SERVER;
+ }
+
+ public String getHost()
+ {
+ return getSocketChannel().socket().getInetAddress().getHostAddress();
+ }
+
+ public int getPort()
+ {
+ return getSocketChannel().socket().getPort();
+ }
+
+ @Override
+ public String toString()
+ {
+ try
+ {
+ SocketAddress address = getSocketChannel().socket().getRemoteSocketAddress();
+ return "ServerTCPConnector[" + address + "]";
+ }
+ catch (Exception ex)
+ {
+ return "ServerTCPConnector";
+ }
+ }
+
+ @Override
+ protected void onDeactivate() throws Exception
+ {
+ super.onDeactivate();
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
new file mode 100644
index 0000000000..801c281c32
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java
@@ -0,0 +1,371 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.transport.tcp;
+
+import org.eclipse.net4j.transport.Buffer;
+import org.eclipse.net4j.transport.BufferProvider;
+import org.eclipse.net4j.transport.ProtocolFactory;
+import org.eclipse.net4j.transport.tcp.TCPAcceptor;
+import org.eclipse.net4j.transport.tcp.TCPAcceptorListener;
+import org.eclipse.net4j.transport.tcp.TCPConnector;
+import org.eclipse.net4j.transport.tcp.TCPSelector;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener;
+import org.eclipse.net4j.util.lifecycle.AbstractLifecycle;
+import org.eclipse.net4j.util.lifecycle.LifecycleListener;
+import org.eclipse.net4j.util.lifecycle.LifecycleNotifier;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.registry.IRegistry;
+
+import org.eclipse.internal.net4j.transport.ChannelImpl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Eike Stepper
+ */
+public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, BufferProvider,
+ TCPSelectorListener.Passive, LifecycleListener
+{
+ private IRegistry<String, ProtocolFactory> protocolFactoryRegistry;
+
+ private BufferProvider bufferProvider;
+
+ private TCPSelector selector;
+
+ private String listenAddr = DEFAULT_ADDRESS;
+
+ private int listenPort = DEFAULT_PORT;
+
+ private ServerSocketChannel serverSocketChannel;
+
+ private SelectionKey selectionKey;
+
+ private Set<TCPConnector> acceptedConnectors = new HashSet();
+
+ /**
+ * An optional executor to be used by the {@link Channel}s to process their
+ * {@link ChannelImpl#receiveQueue} instead of the current thread. If not
+ * <code>null</code> the calling thread of
+ * {@link ChannelImpl#handleBufferFromMultiplexer(Buffer)} becomes decoupled.
+ * <p>
+ */
+ private ExecutorService receiveExecutor;
+
+ /**
+ * Don't initialize lazily to circumvent synchronization!
+ */
+ private Queue<TCPAcceptorListener> listeners = new ConcurrentLinkedQueue();
+
+ public TCPAcceptorImpl()
+ {
+ }
+
+ public ExecutorService getReceiveExecutor()
+ {
+ return receiveExecutor;
+ }
+
+ public void setReceiveExecutor(ExecutorService receiveExecutor)
+ {
+ this.receiveExecutor = receiveExecutor;
+ }
+
+ public IRegistry<String, ProtocolFactory> getProtocolFactoryRegistry()
+ {
+ return protocolFactoryRegistry;
+ }
+
+ public void setProtocolFactoryRegistry(IRegistry<String, ProtocolFactory> protocolFactoryRegistry)
+ {
+ this.protocolFactoryRegistry = protocolFactoryRegistry;
+ }
+
+ public BufferProvider getBufferProvider()
+ {
+ return bufferProvider;
+ }
+
+ public void setBufferProvider(BufferProvider bufferProvider)
+ {
+ this.bufferProvider = bufferProvider;
+ }
+
+ public short getBufferCapacity()
+ {
+ return bufferProvider.getBufferCapacity();
+ }
+
+ public Buffer provideBuffer()
+ {
+ return bufferProvider.provideBuffer();
+ }
+
+ public void retainBuffer(Buffer buffer)
+ {
+ bufferProvider.retainBuffer(buffer);
+ }
+
+ public TCPSelector getSelector()
+ {
+ return selector;
+ }
+
+ public void setSelector(TCPSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ public String getAddress()
+ {
+ return listenAddr;
+ }
+
+ public void setListenAddr(String listenAddr)
+ {
+ this.listenAddr = listenAddr;
+ }
+
+ public int getPort()
+ {
+ return listenPort;
+ }
+
+ public void setListenPort(int listenPort)
+ {
+ this.listenPort = listenPort;
+ }
+
+ public TCPConnector[] getAcceptedConnectors()
+ {
+ ArrayList<TCPConnector> result;
+ synchronized (acceptedConnectors)
+ {
+ result = new ArrayList<TCPConnector>(acceptedConnectors);
+ }
+
+ return result.toArray(new TCPConnector[result.size()]);
+ }
+
+ public void addListener(TCPAcceptorListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ public void removeListener(TCPAcceptorListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ public void notifyLifecycleActivated(LifecycleNotifier notifier)
+ {
+ // Do nothing
+ }
+
+ public void notifyLifecycleDeactivating(LifecycleNotifier notifier)
+ {
+ synchronized (acceptedConnectors)
+ {
+ notifier.removeLifecycleListener(this);
+ acceptedConnectors.remove(notifier);
+ }
+ }
+
+ public void handleAccept(TCPSelector selector, ServerSocketChannel serverSocketChannel)
+ {
+ try
+ {
+ SocketChannel socketChannel = serverSocketChannel.accept();
+ if (socketChannel != null)
+ {
+ socketChannel.configureBlocking(false);
+ addConnector(socketChannel);
+ }
+ }
+ catch (ClosedByInterruptException ex)
+ {
+ deactivate();
+ }
+ catch (Exception ex)
+ {
+ if (isActive())
+ {
+ ex.printStackTrace();
+ }
+
+ deactivate();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TCPAcceptor[" + "/" + listenAddr + ":" + listenPort + "]";
+ }
+
+ protected void addConnector(SocketChannel socketChannel)
+ {
+ try
+ {
+ AbstractTCPConnector connector = createConnector(socketChannel);
+ connector.activate();
+ connector.addLifecycleListener(this);
+
+ synchronized (acceptedConnectors)
+ {
+ acceptedConnectors.add(connector);
+ }
+
+ fireConnectorAccepted(connector);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+
+ try
+ {
+ socketChannel.close();
+ }
+ catch (IOException ioex)
+ {
+ ioex.printStackTrace();
+ }
+ }
+ }
+
+ protected AbstractTCPConnector createConnector(SocketChannel socketChannel)
+ {
+ return new ServerTCPConnectorImpl(socketChannel, getReceiveExecutor(),
+ getProtocolFactoryRegistry(), bufferProvider, selector);
+ }
+
+ protected void fireConnectorAccepted(TCPConnector connector)
+ {
+ for (TCPAcceptorListener listener : listeners)
+ {
+ try
+ {
+ listener.notifyConnectorAccepted(this, connector);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ protected void onAccessBeforeActivate() throws Exception
+ {
+ super.onAccessBeforeActivate();
+ if (bufferProvider == null)
+ {
+ throw new IllegalStateException("bufferProvider == null");
+ }
+
+ if (protocolFactoryRegistry == null)
+ {
+ System.out.println(toString() + ": (INFO) protocolFactoryRegistry == null");
+ }
+
+ if (receiveExecutor == null)
+ {
+ System.out.println(toString() + ": (INFO) receiveExecutor == null");
+ }
+
+ if (selector == null)
+ {
+ selector = TCPUtil.createTCPSelector();
+ LifecycleUtil.activate(selector);
+ }
+ }
+
+ @Override
+ protected void onActivate() throws Exception
+ {
+ super.onActivate();
+ InetAddress addr = InetAddress.getByName(listenAddr);
+ InetSocketAddress sAddr = new InetSocketAddress(addr, listenPort);
+
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ serverSocketChannel.socket().bind(sAddr);
+
+ selectionKey = selector.register(serverSocketChannel, this);
+ }
+
+ @Override
+ protected void onDeactivate() throws Exception
+ {
+ for (TCPConnector connector : getAcceptedConnectors())
+ {
+ try
+ {
+ LifecycleUtil.deactivate(connector);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ Exception exception = null;
+
+ try
+ {
+ selectionKey.cancel();
+ }
+ catch (RuntimeException ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ selectionKey = null;
+ }
+
+ try
+ {
+ serverSocketChannel.close();
+ }
+ catch (RuntimeException ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ serverSocketChannel = null;
+ }
+
+ if (exception != null)
+ {
+ throw exception;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
new file mode 100644
index 0000000000..ec1d1c66ed
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java
@@ -0,0 +1,212 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.transport.tcp;
+
+import org.eclipse.net4j.transport.tcp.TCPSelector;
+import org.eclipse.net4j.transport.tcp.TCPSelectorListener;
+import org.eclipse.net4j.util.lifecycle.AbstractLifecycle;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+/**
+ * @author Eike Stepper
+ */
+public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, Runnable
+{
+ private static final long SELECT_TIMEOUT = 100;
+
+ private Selector selector;
+
+ private Thread thread;
+
+ public TCPSelectorImpl()
+ {
+ }
+
+ public SelectionKey register(ServerSocketChannel channel, TCPSelectorListener.Passive listener)
+ throws ClosedChannelException
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("listener == null");
+ }
+
+ System.out.println(toString() + ": Registering " + TCPUtil.toString(channel));
+ return channel.register(selector, SelectionKey.OP_ACCEPT, listener);
+ }
+
+ public SelectionKey register(SocketChannel channel, TCPSelectorListener.Active listener)
+ throws ClosedChannelException
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("listener == null");
+ }
+
+ System.out.println(toString() + ": Registering " + TCPUtil.toString(channel));
+ return channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener);
+ }
+
+ public void run()
+ {
+ while (isActive())
+ {
+ if (Thread.interrupted())
+ {
+ deactivate();
+ break;
+ }
+
+ try
+ {
+ if (selector.select(SELECT_TIMEOUT) > 0)
+ {
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+ while (it.hasNext())
+ {
+ SelectionKey selKey = it.next();
+ it.remove();
+
+ try
+ {
+ handleSelection(selKey);
+ }
+ catch (CancelledKeyException ignore)
+ {
+ ; // Do nothing
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ selKey.cancel();
+ }
+ }
+ }
+ }
+ catch (ClosedSelectorException ex)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ deactivate();
+ break;
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TCPSelector";
+ }
+
+ protected void handleSelection(SelectionKey selKey) throws IOException
+ {
+ SelectableChannel channel = selKey.channel();
+ if (channel instanceof ServerSocketChannel)
+ {
+ ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel();
+ TCPSelectorListener.Passive listener = (TCPSelectorListener.Passive)selKey.attachment();
+
+ if (selKey.isAcceptable())
+ {
+ System.out.println(toString() + ": Accepting " + TCPUtil.toString(ssChannel));
+ listener.handleAccept(this, ssChannel);
+ }
+ }
+ else if (channel instanceof SocketChannel)
+ {
+ SocketChannel sChannel = (SocketChannel)channel;
+ TCPSelectorListener.Active listener = (TCPSelectorListener.Active)selKey.attachment();
+
+ if (selKey.isConnectable())
+ {
+ System.out.println(toString() + ": Connecting " + TCPUtil.toString(sChannel));
+ listener.handleConnect(this, sChannel);
+ }
+
+ if (selKey.isReadable())
+ {
+ System.out.println(toString() + ": Reading " + TCPUtil.toString(sChannel));
+ listener.handleRead(this, sChannel);
+ }
+
+ if (selKey.isWritable())
+ {
+ System.out.println(toString() + ": Writing " + TCPUtil.toString(sChannel));
+ listener.handleWrite(this, sChannel);
+ }
+ }
+ }
+
+ @Override
+ protected void onActivate() throws Exception
+ {
+ selector = Selector.open();
+ thread = new Thread(this);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ @Override
+ protected void onDeactivate() throws Exception
+ {
+ Exception exception = null;
+
+ try
+ {
+ thread.join(2 * SELECT_TIMEOUT);
+ }
+ catch (RuntimeException ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ thread = null;
+ }
+
+ try
+ {
+ selector.close();
+ }
+ catch (Exception ex)
+ {
+ if (exception == null)
+ {
+ exception = ex;
+ }
+ }
+ finally
+ {
+ selector = null;
+ }
+
+ if (exception != null)
+ {
+ throw exception;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java
new file mode 100644
index 0000000000..634ca4389c
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java
@@ -0,0 +1,161 @@
+/***************************************************************************
+ * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.internal.net4j.transport.tcp;
+
+import org.eclipse.net4j.transport.BufferProvider;
+import org.eclipse.net4j.transport.tcp.TCPAcceptor;
+import org.eclipse.net4j.transport.tcp.TCPConnector;
+import org.eclipse.net4j.transport.tcp.TCPSelector;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * @author Eike Stepper
+ */
+public final class TCPUtil
+{
+ private TCPUtil()
+ {
+ }
+
+ public static TCPSelector createTCPSelector()
+ {
+ TCPSelectorImpl selector = new TCPSelectorImpl();
+ return selector;
+ }
+
+ public static TCPAcceptor createTCPAcceptor(BufferProvider bufferProvider, TCPSelector selector,
+ String address, int port)
+ {
+ TCPAcceptorImpl acceptor = new TCPAcceptorImpl();
+ acceptor.setBufferProvider(bufferProvider);
+ acceptor.setSelector(selector);
+ acceptor.setListenPort(port);
+ acceptor.setListenAddr(address);
+ return acceptor;
+ }
+
+ public static TCPAcceptor createTCPAcceptor(BufferProvider bufferProvider, TCPSelector selector)
+ {
+ return createTCPAcceptor(bufferProvider, selector, TCPAcceptor.DEFAULT_ADDRESS,
+ TCPAcceptor.DEFAULT_PORT);
+ }
+
+ public static TCPConnector createTCPConnector(BufferProvider bufferProvider,
+ TCPSelector selector, String host, int port)
+ {
+ ClientTCPConnectorImpl connector = new ClientTCPConnectorImpl();
+ connector.setBufferProvider(bufferProvider);
+ connector.setSelector(selector);
+ connector.setHost(host);
+ connector.setPort(port);
+ return connector;
+ }
+
+ public static TCPConnector createTCPConnector(BufferProvider bufferProvider,
+ TCPSelector selector, String host)
+ {
+ return createTCPConnector(bufferProvider, selector, host, TCPConnector.DEFAULT_PORT);
+ }
+
+ public static String toString(ServerSocketChannel channel)
+ {
+ return channel.toString();
+ // return "ServerSocketChannel[" + channel.socket().getLocalSocketAddress()
+ // + "]";
+ }
+
+ public static String toString(SocketChannel channel)
+ {
+ return channel.toString();
+ // return "SocketChannel[" + channel.socket().getRemoteSocketAddress() +
+ // "]";
+ }
+
+ public static String formatInterestOps(int newOps)
+ {
+ StringBuilder builder = new StringBuilder();
+ if ((newOps & SelectionKey.OP_ACCEPT) != 0)
+ {
+ addInterestOp(builder, "ACCEPT");
+ }
+
+ if ((newOps & SelectionKey.OP_CONNECT) != 0)
+ {
+ addInterestOp(builder, "CONNECT");
+ }
+
+ if ((newOps & SelectionKey.OP_READ) != 0)
+ {
+ addInterestOp(builder, "READ");
+ }
+
+ if ((newOps & SelectionKey.OP_WRITE) != 0)
+ {
+ addInterestOp(builder, "WRITE");
+ }
+
+ return builder.toString();
+ }
+
+ public static void setInterest(SelectionKey selectionKey, int operation, boolean interested)
+ {
+ int newOps;
+ int oldOps = selectionKey.interestOps();
+ if (interested)
+ {
+ newOps = oldOps | operation;
+ }
+ else
+ {
+ newOps = oldOps & ~operation;
+ }
+
+ if (oldOps != newOps)
+ {
+ System.out.println(selectionKey.channel().toString() + ": Setting interest "
+ + formatInterestOps(newOps) + " (was " + formatInterestOps(oldOps).toLowerCase() + ")");
+ selectionKey.interestOps(newOps);
+ }
+ }
+
+ public static void setAcceptInterest(SelectionKey selectionKey, boolean interested)
+ {
+ setInterest(selectionKey, SelectionKey.OP_ACCEPT, interested);
+ }
+
+ public static void setConnectInterest(SelectionKey selectionKey, boolean interested)
+ {
+ setInterest(selectionKey, SelectionKey.OP_CONNECT, interested);
+ }
+
+ public static void setReadInterest(SelectionKey selectionKey, boolean interested)
+ {
+ setInterest(selectionKey, SelectionKey.OP_READ, interested);
+ }
+
+ public static void setWriteInterest(SelectionKey selectionKey, boolean interested)
+ {
+ setInterest(selectionKey, SelectionKey.OP_WRITE, interested);
+ }
+
+ private static void addInterestOp(StringBuilder builder, String op)
+ {
+ if (builder.length() != 0)
+ {
+ builder.append("|");
+ }
+
+ builder.append(op);
+ }
+}

Back to the top