/*************************************************************************** * Copyright (c) 2004-2007 Eike Stepper, Germany. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ package org.eclipse.internal.net4j.transport; import org.eclipse.net4j.transport.IBuffer; import org.eclipse.net4j.transport.IBufferHandler; import org.eclipse.net4j.transport.IBufferProvider; import org.eclipse.net4j.transport.IChannel; import org.eclipse.net4j.transport.IChannelID; import org.eclipse.net4j.transport.IConnector; import org.eclipse.net4j.util.ObjectUtil; import org.eclipse.net4j.util.concurrent.IWorkSerializer; import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.net4j.util.registry.IRegistry; import org.eclipse.internal.net4j.bundle.Net4j; import org.eclipse.internal.net4j.transport.Buffer.State; import org.eclipse.internal.net4j.util.Value; import org.eclipse.internal.net4j.util.concurrent.AsynchronousWorkSerializer; import org.eclipse.internal.net4j.util.concurrent.SynchronousWorkSerializer; import org.eclipse.internal.net4j.util.lifecycle.Lifecycle; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ public class Channel extends Lifecycle implements IChannel, IBufferProvider { private static final ContextTracer TRACER = new ContextTracer(Net4j.DEBUG_CHANNEL, Channel.class); private short channelIndex = Buffer.NO_CHANNEL; private Connector connector; /** * The external handler for buffers passed from the {@link #connector}. *

*/ private IBufferHandler receiveHandler; private ExecutorService receiveExecutor; private IWorkSerializer receiveSerializer; private Queue sendQueue; public IRegistry channelRegistry; public Channel(ExecutorService receiveExecutor) { this.receiveExecutor = receiveExecutor; } public IChannelID getID() { return new ChannelIDImpl(); } public short getChannelIndex() { return channelIndex; } public void setChannelIndex(short channelIndex) { if (channelIndex == Buffer.NO_CHANNEL) { throw new IllegalArgumentException("channelIndex == INVALID_CHANNEL_ID"); //$NON-NLS-1$ } this.channelIndex = channelIndex; } public Connector getConnector() { return connector; } public void setConnector(Connector connector) { this.connector = connector; } public short getBufferCapacity() { return connector.getBufferProvider().getBufferCapacity(); } public IBuffer provideBuffer() { return connector.getBufferProvider().provideBuffer(); } public void retainBuffer(IBuffer buffer) { connector.getBufferProvider().retainBuffer(buffer); } public Queue getSendQueue() { return sendQueue; } public IRegistry getChannelRegistry() { return channelRegistry; } public void setChannelRegistry(IRegistry channelRegistry) { this.channelRegistry = channelRegistry; } public IBufferHandler getReceiveHandler() { return receiveHandler; } public void setReceiveHandler(IBufferHandler receiveHandler) { this.receiveHandler = receiveHandler; } public ExecutorService getReceiveExecutor() { return receiveExecutor; } public boolean isInternal() { return false; } public void close() { deactivate(); } public void sendBuffer(IBuffer buffer) { handleBuffer(buffer); } public void handleBuffer(IBuffer buffer) { State state = ((Buffer)buffer).getState(); if (state != State.PUTTING) { Net4j.LOG.warn("Ignoring buffer in state == " + state + ": " + this); //$NON-NLS-1$ //$NON-NLS-2$ return; } if (TRACER.isEnabled()) { TRACER.format("Handling buffer from client: {0} --> {1}", buffer, this); //$NON-NLS-1$ } if (sendQueue == null) { throw new IllegalStateException("sendQueue == null"); } sendQueue.add(buffer); connector.multiplexBuffer(this); } public void handleBufferFromMultiplexer(final IBuffer buffer) { if (receiveHandler == null) { Net4j.LOG.warn("Ignoring buffer because receiveHandler == null: " + this); //$NON-NLS-1$ buffer.release(); return; } if (TRACER.isEnabled()) { TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$ } receiveSerializer.addWork(new ReceiverWork(buffer)); } @Override public String toString() { return "Channel[" + connector + ", channelIndex=" + channelIndex + "]"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ } @Override protected void doBeforeActivate() throws Exception { super.doBeforeActivate(); if (channelIndex == Buffer.NO_CHANNEL) { throw new IllegalStateException("channelIndex == INVALID_CHANNEL_ID"); //$NON-NLS-1$ } if (connector == null) { throw new IllegalStateException("connector == null"); //$NON-NLS-1$ } } @Override protected void doActivate() throws Exception { super.doActivate(); sendQueue = new ConcurrentLinkedQueue(); if (receiveExecutor == null) { receiveSerializer = new SynchronousWorkSerializer(); } else { receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor); } if (!isInternal() && channelRegistry != null) { channelRegistry.put(getID(), this); channelRegistry.commit(); } } @Override protected void doDeactivate() throws Exception { if (!isInternal() && channelRegistry != null) { channelRegistry.remove(getID()); channelRegistry.commit(); } receiveSerializer = null; if (sendQueue != null) { sendQueue.clear(); sendQueue = null; } super.doDeactivate(); } /** * @author Eike Stepper */ private final class ChannelIDImpl extends Value implements IChannelID { private static final long serialVersionUID = 1L; public ChannelIDImpl() { } public IConnector getConnector() { return connector; } public short getChannelIndex() { return channelIndex; } @Override protected Object clone() throws CloneNotSupportedException { return this; } @Override public boolean equals(Object obj) { if (obj instanceof IChannelID) { IChannelID that = (IChannelID)obj; return channelIndex == that.getChannelIndex() && ObjectUtil.equals(connector, that.getConnector()); } return false; } @Override public int hashCode() { return ObjectUtil.hashCode(connector) ^ channelIndex; } @Override public String toString() { return "ChannelID[" + connector + ", channelIndex=" + channelIndex + "]"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ } } /** * @author Eike Stepper */ private final class ReceiverWork implements Runnable { private final IBuffer buffer; private ReceiverWork(IBuffer buffer) { this.buffer = buffer; } public void run() { receiveHandler.handleBuffer(buffer); } } }