/*************************************************************************** * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ package org.eclipse.internal.net4j.transport.tcp; import org.eclipse.net4j.transport.Buffer; import org.eclipse.net4j.transport.Channel; import org.eclipse.net4j.transport.ConnectorException; import org.eclipse.net4j.transport.tcp.TCPConnector; import org.eclipse.net4j.transport.tcp.TCPSelector; import org.eclipse.net4j.transport.tcp.TCPSelectorListener; import org.eclipse.net4j.util.Net4jUtil; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.bundle.Net4j; import org.eclipse.internal.net4j.transport.AbstractConnector; import org.eclipse.internal.net4j.transport.ChannelImpl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Queue; /** * @author Eike Stepper */ public abstract class AbstractTCPConnector extends AbstractConnector implements TCPConnector, TCPSelectorListener.Active { private static final ContextTracer TRACER = new ContextTracer(Net4j.DEBUG_CONNECTOR, AbstractTCPConnector.class); private SocketChannel socketChannel; private TCPSelector selector; private SelectionKey selectionKey; private Buffer inputBuffer; private ControlChannelImpl controlChannel; public AbstractTCPConnector() { try { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException ex) { Net4j.LOG.error(ex); } } /** * SocketChannel must already be non-blocking! */ public AbstractTCPConnector(SocketChannel socketChannel) { this.socketChannel = socketChannel; } public TCPSelector getSelector() { return selector; } public void setSelector(TCPSelector selector) { this.selector = selector; } public SocketChannel getSocketChannel() { return socketChannel; } /** * Called by {@link ChannelImpl} each time a new buffer is available for * multiplexing. This or another buffer can be dequeued from the outputQueue * of the {@link ChannelImpl}. */ public void multiplexBuffer(Channel channel) { checkSelectionKey(); selector.setWriteInterest(selectionKey, true); } public void registered(SelectionKey selectionKey) { this.selectionKey = selectionKey; if (getType() == Type.SERVER) { selector.setConnectInterest(selectionKey, false); } } public void handleConnect(TCPSelector selector, SocketChannel channel) { try { if (!channel.finishConnect()) { return; } } catch (Exception ex) { return; } try { checkSelectionKey(); selector.setConnectInterest(selectionKey, false); setState(State.NEGOTIATING); } catch (Exception ex) { Net4j.LOG.error(ex); deactivate(); } } public void handleRead(TCPSelector selector, SocketChannel socketChannel) { try { if (inputBuffer == null) { inputBuffer = getBufferProvider().provideBuffer(); } ByteBuffer byteBuffer = inputBuffer.startGetting(socketChannel); if (byteBuffer != null) { short channelIndex = inputBuffer.getChannelIndex(); ChannelImpl channel = channelIndex == ControlChannelImpl.CONTROL_CHANNEL_ID ? controlChannel : getChannel(channelIndex); if (channel != null) { channel.handleBufferFromMultiplexer(inputBuffer); } else { if (TRACER.isEnabled()) { TRACER.trace("Discarding buffer from unknown channel"); //$NON-NLS-1$ } inputBuffer.release(); } inputBuffer = null; } } catch (ClosedChannelException ex) { deactivate(); } catch (Exception ex) { Net4j.LOG.error(ex); deactivate(); } } public void handleWrite(TCPSelector selector, SocketChannel socketChannel) { try { boolean moreToWrite = false; for (Queue bufferQueue : getChannelBufferQueues()) { Buffer buffer = bufferQueue.peek(); if (buffer != null) { if (buffer.write(socketChannel)) { bufferQueue.poll(); buffer.release(); if (!moreToWrite) { moreToWrite = !bufferQueue.isEmpty(); } } else { moreToWrite = true; break; } } } if (!moreToWrite) { checkSelectionKey(); selector.setWriteInterest(selectionKey, false); } } catch (NullPointerException ignore) { ; } catch (ClosedChannelException ex) { deactivate(); } catch (Exception ex) { Net4j.LOG.error(ex); deactivate(); } } @Override protected List> getChannelBufferQueues() { List> queues = super.getChannelBufferQueues(); Queue controlQueue = controlChannel.getSendQueue(); if (!controlQueue.isEmpty()) { queues.add(controlQueue); } return queues; } @Override protected void registerChannelWithPeer(short channelIndex, String protocolID) throws ConnectorException { try { if (!controlChannel.registerChannel(channelIndex, protocolID)) { throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$ } } catch (ConnectorException ex) { throw ex; } catch (Exception ex) { throw new ConnectorException(ex); } } @Override protected void removeChannel(ChannelImpl channel) { if (isConnected()) { controlChannel.deregisterChannel(channel.getChannelIndex()); } super.removeChannel(channel); } @Override protected void onAboutToActivate() throws Exception { super.onAboutToActivate(); if (selector == null) { selector = Net4jUtil.createTCPSelector(); LifecycleUtil.activate(selector); } } @Override protected void onActivate() throws Exception { super.onActivate(); controlChannel = new ControlChannelImpl(this); controlChannel.activate(); selector.registerAsync(socketChannel, this); } @Override protected void onDeactivate() throws Exception { Exception exception = null; try { controlChannel.deactivate(); } catch (Exception ex) { if (exception == null) { exception = ex; } } finally { controlChannel = null; } try { socketChannel.close(); } catch (Exception ex) { if (exception == null) { exception = ex; } } finally { socketChannel = null; } try { super.onDeactivate(); } catch (Exception ex) { if (exception == null) { exception = ex; } } if (exception != null) { throw exception; } } private void checkSelectionKey() { if (selectionKey == null) { throw new IllegalStateException("selectionKey == null"); } } }