diff options
Diffstat (limited to 'plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp')
7 files changed, 1402 insertions, 0 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java new file mode 100644 index 0000000000..3724e0025e --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java @@ -0,0 +1,352 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.internal.net4j.transport.tcp; + +import org.eclipse.net4j.transport.Buffer; +import org.eclipse.net4j.transport.Channel; +import org.eclipse.net4j.transport.ConnectorException; +import org.eclipse.net4j.transport.tcp.TCPConnector; +import org.eclipse.net4j.transport.tcp.TCPSelector; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; + +import org.eclipse.internal.net4j.transport.AbstractConnector; +import org.eclipse.internal.net4j.transport.ChannelImpl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.List; +import java.util.Queue; + +/** + * @author Eike Stepper + */ +public abstract class AbstractTCPConnector extends AbstractConnector implements TCPConnector, + TCPSelectorListener.Active +{ + private SocketChannel socketChannel; + + private TCPSelector selector; + + private SelectionKey selectionKey; + + private Buffer inputBuffer; + + private ControlChannelImpl controlChannel; + + public AbstractTCPConnector() + { + try + { + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + } + catch (IOException ex) + { + ex.printStackTrace(); + } + } + + /** + * SocketChannel must already be non-blocking! + */ + public AbstractTCPConnector(SocketChannel socketChannel) + { + this.socketChannel = socketChannel; + } + + public TCPSelector getSelector() + { + return selector; + } + + public void setSelector(TCPSelector selector) + { + this.selector = selector; + } + + public SocketChannel getSocketChannel() + { + return socketChannel; + } + + public SelectionKey getSelectionKey() + { + return selectionKey; + } + + /** + * Called by {@link ChannelImpl} each time a new buffer is available for + * multiplexing. This or another buffer can be dequeued from the outputQueue + * of the {@link ChannelImpl}. + */ + public void multiplexBuffer(Channel channel) + { + TCPUtil.setWriteInterest(selectionKey, true); + } + + public void handleConnect(TCPSelector selector, SocketChannel channel) + { + try + { + if (!channel.finishConnect()) + { + return; + } + } + catch (Exception ex) + { + return; + } + + try + { + TCPUtil.setConnectInterest(selectionKey, false); + TCPUtil.setReadInterest(selectionKey, true); + setState(State.NEGOTIATING); + } + catch (Exception ex) + { + ex.printStackTrace(); + deactivate(); + } + } + + public void handleRead(TCPSelector selector, SocketChannel socketChannel) + { + // TODO Is this needed? + if (!socketChannel.isConnected()) + { + deactivate(); + return; + } + + try + { + if (inputBuffer == null) + { + inputBuffer = getBufferProvider().provideBuffer(); + } + + ByteBuffer byteBuffer = inputBuffer.startGetting(socketChannel); + if (byteBuffer != null) + { + short channelID = inputBuffer.getChannelID(); + ChannelImpl channel = channelID == ControlChannelImpl.CONTROL_CHANNEL_ID ? controlChannel + : getChannel(channelID); + if (channel != null) + { + channel.handleBufferFromMultiplexer(inputBuffer); + } + else + { + System.out.println(toString() + ": Discarding buffer from unknown channel"); + inputBuffer.release(); + } + + inputBuffer = null; + } + } + catch (ClosedChannelException ex) + { + deactivate(); + } + catch (Exception ex) + { + ex.printStackTrace(); + deactivate(); + } + } + + public void handleWrite(TCPSelector selector, SocketChannel socketChannel) + { + // TODO Is this needed? + if (!socketChannel.isConnected()) + { + deactivate(); + return; + } + + try + { + boolean moreToWrite = false; + for (Queue<Buffer> bufferQueue : getChannelBufferQueues()) + { + Buffer buffer = bufferQueue.peek(); + if (buffer != null) + { + if (buffer.write(socketChannel)) + { + bufferQueue.poll(); + buffer.release(); + + if (!moreToWrite) + { + moreToWrite = !bufferQueue.isEmpty(); + } + } + else + { + moreToWrite = true; + break; + } + } + } + + if (!moreToWrite) + { + TCPUtil.setWriteInterest(selectionKey, false); + } + } + catch (ClosedChannelException ex) + { + deactivate(); + } + catch (Exception ex) + { + ex.printStackTrace(); + deactivate(); + } + } + + @Override + protected List<Queue<Buffer>> getChannelBufferQueues() + { + List<Queue<Buffer>> queues = super.getChannelBufferQueues(); + Queue<Buffer> controlQueue = controlChannel.getSendQueue(); + if (!controlQueue.isEmpty()) + { + queues.add(controlQueue); + } + + return queues; + } + + @Override + protected void registerChannelWithPeer(short channelID, String protocolID) + throws ConnectorException + { + if (!controlChannel.registerChannel(channelID, protocolID)) + { + throw new ConnectorException("Failed to register channel with peer"); + } + } + + @Override + protected void removeChannel(ChannelImpl channel) + { + if (isConnected()) + { + controlChannel.deregisterChannel(channel.getChannelID()); + } + + super.removeChannel(channel); + } + + @Override + protected void onAccessBeforeActivate() throws Exception + { + super.onAccessBeforeActivate(); + if (selector == null) + { + selector = TCPUtil.createTCPSelector(); + LifecycleUtil.activate(selector); + } + } + + @Override + protected void onActivate() throws Exception + { + super.onActivate(); + controlChannel = new ControlChannelImpl(this); + controlChannel.activate(); + + selectionKey = selector.register(socketChannel, this); + if (getType() == Type.SERVER) + { + TCPUtil.setConnectInterest(selectionKey, false); + } + } + + @Override + protected void onDeactivate() throws Exception + { + Exception exception = null; + + try + { + selectionKey.cancel(); + } + catch (RuntimeException ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + selectionKey = null; + } + + try + { + socketChannel.close(); + } + catch (Exception ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + socketChannel = null; + } + + try + { + controlChannel.deactivate(); + } + catch (Exception ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + controlChannel = null; + } + + try + { + super.onDeactivate(); + } + catch (Exception ex) + { + if (exception == null) + { + exception = ex; + } + } + + if (exception != null) + { + throw exception; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java new file mode 100644 index 0000000000..769ab52787 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ClientTCPConnectorImpl.java @@ -0,0 +1,82 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.internal.net4j.transport.tcp; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; + +/** + * @author Eike Stepper + */ +public class ClientTCPConnectorImpl extends AbstractTCPConnector +{ + private String host; + + private int port = DEFAULT_PORT; + + public ClientTCPConnectorImpl() + { + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public void setPort(int port) + { + this.port = port; + } + + public Type getType() + { + return Type.CLIENT; + } + + @Override + public String toString() + { + return "ClientTCPConnector[" + host + ":" + port + "]"; + } + + @Override + protected void onAccessBeforeActivate() throws Exception + { + super.onAccessBeforeActivate(); + if (host == null || host.length() == 0) + { + throw new IllegalArgumentException("host == null || host.length() == 0"); + } + } + + @Override + protected void onActivate() throws Exception + { + super.onActivate(); + SelectionKey selKey = getSelectionKey(); + selKey.interestOps(selKey.interestOps() | SelectionKey.OP_CONNECT); + + InetAddress addr = InetAddress.getByName(host); + InetSocketAddress sAddr = new InetSocketAddress(addr, port); + getSocketChannel().connect(sAddr); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java new file mode 100644 index 0000000000..9a9e920d85 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java @@ -0,0 +1,152 @@ +package org.eclipse.internal.net4j.transport.tcp; + +import org.eclipse.net4j.transport.Buffer; +import org.eclipse.net4j.util.concurrent.Synchronizer; +import org.eclipse.net4j.util.concurrent.SynchronizingCorrelator; + +import org.eclipse.internal.net4j.transport.BufferUtil; +import org.eclipse.internal.net4j.transport.ChannelImpl; + +import java.nio.ByteBuffer; + +/** + * @author Eike Stepper + */ +public final class ControlChannelImpl extends ChannelImpl +{ + public static final short CONTROL_CHANNEL_ID = -1; + + public static final long REGISTRATION_TIMEOUT = 500000; + + public static final byte OPCODE_REGISTRATION = 1; + + public static final byte OPCODE_REGISTRATION_ACK = 2; + + public static final byte OPCODE_DEREGISTRATION = 3; + + public static final byte SUCCESS = 1; + + public static final byte FAILURE = 0; + + private SynchronizingCorrelator<Short, Boolean> registrations = new SynchronizingCorrelator(); + + public ControlChannelImpl(AbstractTCPConnector connector) + { + super(connector.getReceiveExecutor()); + setChannelID(CONTROL_CHANNEL_ID); + setConnector(connector); + } + + public boolean registerChannel(short channelID, String protocolID) + { + assertValidChannelID(channelID); + Synchronizer<Boolean> registration = registrations.correlate(channelID); + + Buffer buffer = provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID); + byteBuffer.put(OPCODE_REGISTRATION); + byteBuffer.putShort(channelID); + BufferUtil.putUTF8(byteBuffer, protocolID); + handleBuffer(buffer); + + return registration.get(REGISTRATION_TIMEOUT); + } + + public void deregisterChannel(short channelID) + { + assertValidChannelID(channelID); + + Buffer buffer = provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID); + byteBuffer.put(OPCODE_DEREGISTRATION); + byteBuffer.putShort(channelID); + handleBuffer(buffer); + } + + private void assertValidChannelID(short channelID) + { + if (channelID <= CONTROL_CHANNEL_ID) + { + throw new IllegalArgumentException("channelID <= CONTROL_CHANNEL_ID"); + } + } + + public void handleBufferFromMultiplexer(Buffer buffer) + { + try + { + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byte opcode = byteBuffer.get(); + System.out.println("CONTROL: " + opcode); + switch (opcode) + { + case OPCODE_REGISTRATION: + { + short channelID = byteBuffer.getShort(); + assertValidChannelID(channelID); + boolean success = true; + + try + { + byte[] handlerFactoryUTF8 = BufferUtil.getByteArray(byteBuffer); + String protocolID = BufferUtil.fromUTF8(handlerFactoryUTF8); + ChannelImpl channel = ((AbstractTCPConnector)getConnector()).createChannel(channelID, + protocolID); + if (channel != null) + { + channel.activate(); + } + else + { + success = false; + } + } + catch (Exception ex) + { + ex.printStackTrace(); + success = false; + } + + sendStatus(OPCODE_REGISTRATION_ACK, channelID, success); + break; + } + + case OPCODE_REGISTRATION_ACK: + { + short channelID = byteBuffer.getShort(); + boolean success = byteBuffer.get() == SUCCESS; + registrations.put(channelID, success); + break; + } + + case OPCODE_DEREGISTRATION: + throw new UnsupportedOperationException(); + + default: + System.out.println("Invalid opcode: " + opcode); + ((AbstractTCPConnector)getConnector()).deactivate(); + break; + } + } + finally + { + buffer.release(); + } + } + + // private Buffer getBuffer() + // { + // return + // ((AbstractTCPConnector)getConnector()).getBufferProvider().provideBuffer(); + // } + + private void sendStatus(byte opcode, short channelID, boolean status) + { + Buffer buffer = provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(CONTROL_CHANNEL_ID); + byteBuffer.put(opcode); + byteBuffer.putShort(channelID); + byteBuffer.put(status ? SUCCESS : FAILURE); + handleBuffer(buffer); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java new file mode 100644 index 0000000000..f79e8aeb6f --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ServerTCPConnectorImpl.java @@ -0,0 +1,72 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.internal.net4j.transport.tcp; + +import org.eclipse.net4j.transport.BufferProvider; +import org.eclipse.net4j.transport.ProtocolFactory; +import org.eclipse.net4j.transport.tcp.TCPSelector; +import org.eclipse.net4j.util.registry.IRegistry; + +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ExecutorService; + +/** + * @author Eike Stepper + */ +public class ServerTCPConnectorImpl extends AbstractTCPConnector +{ + public ServerTCPConnectorImpl(SocketChannel socketChannel, ExecutorService receiveExecutor, + IRegistry<String, ProtocolFactory> protocolFactoryRegistry, BufferProvider bufferProvider, + TCPSelector selector) + { + super(socketChannel); + setReceiveExecutor(receiveExecutor); + setProtocolFactoryRegistry(protocolFactoryRegistry); + setBufferProvider(bufferProvider); + setSelector(selector); + } + + public Type getType() + { + return Type.SERVER; + } + + public String getHost() + { + return getSocketChannel().socket().getInetAddress().getHostAddress(); + } + + public int getPort() + { + return getSocketChannel().socket().getPort(); + } + + @Override + public String toString() + { + try + { + SocketAddress address = getSocketChannel().socket().getRemoteSocketAddress(); + return "ServerTCPConnector[" + address + "]"; + } + catch (Exception ex) + { + return "ServerTCPConnector"; + } + } + + @Override + protected void onDeactivate() throws Exception + { + super.onDeactivate(); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java new file mode 100644 index 0000000000..801c281c32 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPAcceptorImpl.java @@ -0,0 +1,371 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.internal.net4j.transport.tcp; + +import org.eclipse.net4j.transport.Buffer; +import org.eclipse.net4j.transport.BufferProvider; +import org.eclipse.net4j.transport.ProtocolFactory; +import org.eclipse.net4j.transport.tcp.TCPAcceptor; +import org.eclipse.net4j.transport.tcp.TCPAcceptorListener; +import org.eclipse.net4j.transport.tcp.TCPConnector; +import org.eclipse.net4j.transport.tcp.TCPSelector; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener; +import org.eclipse.net4j.util.lifecycle.AbstractLifecycle; +import org.eclipse.net4j.util.lifecycle.LifecycleListener; +import org.eclipse.net4j.util.lifecycle.LifecycleNotifier; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.registry.IRegistry; + +import org.eclipse.internal.net4j.transport.ChannelImpl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.Channel; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; + +/** + * @author Eike Stepper + */ +public class TCPAcceptorImpl extends AbstractLifecycle implements TCPAcceptor, BufferProvider, + TCPSelectorListener.Passive, LifecycleListener +{ + private IRegistry<String, ProtocolFactory> protocolFactoryRegistry; + + private BufferProvider bufferProvider; + + private TCPSelector selector; + + private String listenAddr = DEFAULT_ADDRESS; + + private int listenPort = DEFAULT_PORT; + + private ServerSocketChannel serverSocketChannel; + + private SelectionKey selectionKey; + + private Set<TCPConnector> acceptedConnectors = new HashSet(); + + /** + * An optional executor to be used by the {@link Channel}s to process their + * {@link ChannelImpl#receiveQueue} instead of the current thread. If not + * <code>null</code> the calling thread of + * {@link ChannelImpl#handleBufferFromMultiplexer(Buffer)} becomes decoupled. + * <p> + */ + private ExecutorService receiveExecutor; + + /** + * Don't initialize lazily to circumvent synchronization! + */ + private Queue<TCPAcceptorListener> listeners = new ConcurrentLinkedQueue(); + + public TCPAcceptorImpl() + { + } + + public ExecutorService getReceiveExecutor() + { + return receiveExecutor; + } + + public void setReceiveExecutor(ExecutorService receiveExecutor) + { + this.receiveExecutor = receiveExecutor; + } + + public IRegistry<String, ProtocolFactory> getProtocolFactoryRegistry() + { + return protocolFactoryRegistry; + } + + public void setProtocolFactoryRegistry(IRegistry<String, ProtocolFactory> protocolFactoryRegistry) + { + this.protocolFactoryRegistry = protocolFactoryRegistry; + } + + public BufferProvider getBufferProvider() + { + return bufferProvider; + } + + public void setBufferProvider(BufferProvider bufferProvider) + { + this.bufferProvider = bufferProvider; + } + + public short getBufferCapacity() + { + return bufferProvider.getBufferCapacity(); + } + + public Buffer provideBuffer() + { + return bufferProvider.provideBuffer(); + } + + public void retainBuffer(Buffer buffer) + { + bufferProvider.retainBuffer(buffer); + } + + public TCPSelector getSelector() + { + return selector; + } + + public void setSelector(TCPSelector selector) + { + this.selector = selector; + } + + public String getAddress() + { + return listenAddr; + } + + public void setListenAddr(String listenAddr) + { + this.listenAddr = listenAddr; + } + + public int getPort() + { + return listenPort; + } + + public void setListenPort(int listenPort) + { + this.listenPort = listenPort; + } + + public TCPConnector[] getAcceptedConnectors() + { + ArrayList<TCPConnector> result; + synchronized (acceptedConnectors) + { + result = new ArrayList<TCPConnector>(acceptedConnectors); + } + + return result.toArray(new TCPConnector[result.size()]); + } + + public void addListener(TCPAcceptorListener listener) + { + listeners.add(listener); + } + + public void removeListener(TCPAcceptorListener listener) + { + listeners.remove(listener); + } + + public void notifyLifecycleActivated(LifecycleNotifier notifier) + { + // Do nothing + } + + public void notifyLifecycleDeactivating(LifecycleNotifier notifier) + { + synchronized (acceptedConnectors) + { + notifier.removeLifecycleListener(this); + acceptedConnectors.remove(notifier); + } + } + + public void handleAccept(TCPSelector selector, ServerSocketChannel serverSocketChannel) + { + try + { + SocketChannel socketChannel = serverSocketChannel.accept(); + if (socketChannel != null) + { + socketChannel.configureBlocking(false); + addConnector(socketChannel); + } + } + catch (ClosedByInterruptException ex) + { + deactivate(); + } + catch (Exception ex) + { + if (isActive()) + { + ex.printStackTrace(); + } + + deactivate(); + } + } + + @Override + public String toString() + { + return "TCPAcceptor[" + "/" + listenAddr + ":" + listenPort + "]"; + } + + protected void addConnector(SocketChannel socketChannel) + { + try + { + AbstractTCPConnector connector = createConnector(socketChannel); + connector.activate(); + connector.addLifecycleListener(this); + + synchronized (acceptedConnectors) + { + acceptedConnectors.add(connector); + } + + fireConnectorAccepted(connector); + } + catch (Exception ex) + { + ex.printStackTrace(); + + try + { + socketChannel.close(); + } + catch (IOException ioex) + { + ioex.printStackTrace(); + } + } + } + + protected AbstractTCPConnector createConnector(SocketChannel socketChannel) + { + return new ServerTCPConnectorImpl(socketChannel, getReceiveExecutor(), + getProtocolFactoryRegistry(), bufferProvider, selector); + } + + protected void fireConnectorAccepted(TCPConnector connector) + { + for (TCPAcceptorListener listener : listeners) + { + try + { + listener.notifyConnectorAccepted(this, connector); + } + catch (Exception ex) + { + ex.printStackTrace(); + } + } + } + + @Override + protected void onAccessBeforeActivate() throws Exception + { + super.onAccessBeforeActivate(); + if (bufferProvider == null) + { + throw new IllegalStateException("bufferProvider == null"); + } + + if (protocolFactoryRegistry == null) + { + System.out.println(toString() + ": (INFO) protocolFactoryRegistry == null"); + } + + if (receiveExecutor == null) + { + System.out.println(toString() + ": (INFO) receiveExecutor == null"); + } + + if (selector == null) + { + selector = TCPUtil.createTCPSelector(); + LifecycleUtil.activate(selector); + } + } + + @Override + protected void onActivate() throws Exception + { + super.onActivate(); + InetAddress addr = InetAddress.getByName(listenAddr); + InetSocketAddress sAddr = new InetSocketAddress(addr, listenPort); + + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + serverSocketChannel.socket().bind(sAddr); + + selectionKey = selector.register(serverSocketChannel, this); + } + + @Override + protected void onDeactivate() throws Exception + { + for (TCPConnector connector : getAcceptedConnectors()) + { + try + { + LifecycleUtil.deactivate(connector); + } + catch (Exception ex) + { + ex.printStackTrace(); + } + } + + Exception exception = null; + + try + { + selectionKey.cancel(); + } + catch (RuntimeException ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + selectionKey = null; + } + + try + { + serverSocketChannel.close(); + } + catch (RuntimeException ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + serverSocketChannel = null; + } + + if (exception != null) + { + throw exception; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java new file mode 100644 index 0000000000..ec1d1c66ed --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPSelectorImpl.java @@ -0,0 +1,212 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.internal.net4j.transport.tcp; + +import org.eclipse.net4j.transport.tcp.TCPSelector; +import org.eclipse.net4j.transport.tcp.TCPSelectorListener; +import org.eclipse.net4j.util.lifecycle.AbstractLifecycle; + +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +/** + * @author Eike Stepper + */ +public class TCPSelectorImpl extends AbstractLifecycle implements TCPSelector, Runnable +{ + private static final long SELECT_TIMEOUT = 100; + + private Selector selector; + + private Thread thread; + + public TCPSelectorImpl() + { + } + + public SelectionKey register(ServerSocketChannel channel, TCPSelectorListener.Passive listener) + throws ClosedChannelException + { + if (listener == null) + { + throw new IllegalArgumentException("listener == null"); + } + + System.out.println(toString() + ": Registering " + TCPUtil.toString(channel)); + return channel.register(selector, SelectionKey.OP_ACCEPT, listener); + } + + public SelectionKey register(SocketChannel channel, TCPSelectorListener.Active listener) + throws ClosedChannelException + { + if (listener == null) + { + throw new IllegalArgumentException("listener == null"); + } + + System.out.println(toString() + ": Registering " + TCPUtil.toString(channel)); + return channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, listener); + } + + public void run() + { + while (isActive()) + { + if (Thread.interrupted()) + { + deactivate(); + break; + } + + try + { + if (selector.select(SELECT_TIMEOUT) > 0) + { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) + { + SelectionKey selKey = it.next(); + it.remove(); + + try + { + handleSelection(selKey); + } + catch (CancelledKeyException ignore) + { + ; // Do nothing + } + catch (Exception ex) + { + ex.printStackTrace(); + selKey.cancel(); + } + } + } + } + catch (ClosedSelectorException ex) + { + break; + } + catch (Exception ex) + { + ex.printStackTrace(); + deactivate(); + break; + } + } + } + + @Override + public String toString() + { + return "TCPSelector"; + } + + protected void handleSelection(SelectionKey selKey) throws IOException + { + SelectableChannel channel = selKey.channel(); + if (channel instanceof ServerSocketChannel) + { + ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel(); + TCPSelectorListener.Passive listener = (TCPSelectorListener.Passive)selKey.attachment(); + + if (selKey.isAcceptable()) + { + System.out.println(toString() + ": Accepting " + TCPUtil.toString(ssChannel)); + listener.handleAccept(this, ssChannel); + } + } + else if (channel instanceof SocketChannel) + { + SocketChannel sChannel = (SocketChannel)channel; + TCPSelectorListener.Active listener = (TCPSelectorListener.Active)selKey.attachment(); + + if (selKey.isConnectable()) + { + System.out.println(toString() + ": Connecting " + TCPUtil.toString(sChannel)); + listener.handleConnect(this, sChannel); + } + + if (selKey.isReadable()) + { + System.out.println(toString() + ": Reading " + TCPUtil.toString(sChannel)); + listener.handleRead(this, sChannel); + } + + if (selKey.isWritable()) + { + System.out.println(toString() + ": Writing " + TCPUtil.toString(sChannel)); + listener.handleWrite(this, sChannel); + } + } + } + + @Override + protected void onActivate() throws Exception + { + selector = Selector.open(); + thread = new Thread(this); + thread.setDaemon(true); + thread.start(); + } + + @Override + protected void onDeactivate() throws Exception + { + Exception exception = null; + + try + { + thread.join(2 * SELECT_TIMEOUT); + } + catch (RuntimeException ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + thread = null; + } + + try + { + selector.close(); + } + catch (Exception ex) + { + if (exception == null) + { + exception = ex; + } + } + finally + { + selector = null; + } + + if (exception != null) + { + throw exception; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java new file mode 100644 index 0000000000..634ca4389c --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/TCPUtil.java @@ -0,0 +1,161 @@ +/*************************************************************************** + * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.internal.net4j.transport.tcp; + +import org.eclipse.net4j.transport.BufferProvider; +import org.eclipse.net4j.transport.tcp.TCPAcceptor; +import org.eclipse.net4j.transport.tcp.TCPConnector; +import org.eclipse.net4j.transport.tcp.TCPSelector; + +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * @author Eike Stepper + */ +public final class TCPUtil +{ + private TCPUtil() + { + } + + public static TCPSelector createTCPSelector() + { + TCPSelectorImpl selector = new TCPSelectorImpl(); + return selector; + } + + public static TCPAcceptor createTCPAcceptor(BufferProvider bufferProvider, TCPSelector selector, + String address, int port) + { + TCPAcceptorImpl acceptor = new TCPAcceptorImpl(); + acceptor.setBufferProvider(bufferProvider); + acceptor.setSelector(selector); + acceptor.setListenPort(port); + acceptor.setListenAddr(address); + return acceptor; + } + + public static TCPAcceptor createTCPAcceptor(BufferProvider bufferProvider, TCPSelector selector) + { + return createTCPAcceptor(bufferProvider, selector, TCPAcceptor.DEFAULT_ADDRESS, + TCPAcceptor.DEFAULT_PORT); + } + + public static TCPConnector createTCPConnector(BufferProvider bufferProvider, + TCPSelector selector, String host, int port) + { + ClientTCPConnectorImpl connector = new ClientTCPConnectorImpl(); + connector.setBufferProvider(bufferProvider); + connector.setSelector(selector); + connector.setHost(host); + connector.setPort(port); + return connector; + } + + public static TCPConnector createTCPConnector(BufferProvider bufferProvider, + TCPSelector selector, String host) + { + return createTCPConnector(bufferProvider, selector, host, TCPConnector.DEFAULT_PORT); + } + + public static String toString(ServerSocketChannel channel) + { + return channel.toString(); + // return "ServerSocketChannel[" + channel.socket().getLocalSocketAddress() + // + "]"; + } + + public static String toString(SocketChannel channel) + { + return channel.toString(); + // return "SocketChannel[" + channel.socket().getRemoteSocketAddress() + + // "]"; + } + + public static String formatInterestOps(int newOps) + { + StringBuilder builder = new StringBuilder(); + if ((newOps & SelectionKey.OP_ACCEPT) != 0) + { + addInterestOp(builder, "ACCEPT"); + } + + if ((newOps & SelectionKey.OP_CONNECT) != 0) + { + addInterestOp(builder, "CONNECT"); + } + + if ((newOps & SelectionKey.OP_READ) != 0) + { + addInterestOp(builder, "READ"); + } + + if ((newOps & SelectionKey.OP_WRITE) != 0) + { + addInterestOp(builder, "WRITE"); + } + + return builder.toString(); + } + + public static void setInterest(SelectionKey selectionKey, int operation, boolean interested) + { + int newOps; + int oldOps = selectionKey.interestOps(); + if (interested) + { + newOps = oldOps | operation; + } + else + { + newOps = oldOps & ~operation; + } + + if (oldOps != newOps) + { + System.out.println(selectionKey.channel().toString() + ": Setting interest " + + formatInterestOps(newOps) + " (was " + formatInterestOps(oldOps).toLowerCase() + ")"); + selectionKey.interestOps(newOps); + } + } + + public static void setAcceptInterest(SelectionKey selectionKey, boolean interested) + { + setInterest(selectionKey, SelectionKey.OP_ACCEPT, interested); + } + + public static void setConnectInterest(SelectionKey selectionKey, boolean interested) + { + setInterest(selectionKey, SelectionKey.OP_CONNECT, interested); + } + + public static void setReadInterest(SelectionKey selectionKey, boolean interested) + { + setInterest(selectionKey, SelectionKey.OP_READ, interested); + } + + public static void setWriteInterest(SelectionKey selectionKey, boolean interested) + { + setInterest(selectionKey, SelectionKey.OP_WRITE, interested); + } + + private static void addInterestOp(StringBuilder builder, String op) + { + if (builder.length() != 0) + { + builder.append("|"); + } + + builder.append(op); + } +} |