diff options
author | Eike Stepper | 2012-06-07 10:14:51 +0000 |
---|---|---|
committer | Eike Stepper | 2012-06-07 10:14:51 +0000 |
commit | a25531ae8a5c85586fdb590c0f806be3a807b634 (patch) | |
tree | 7849f8065be919611dabd87403f9bf5a1743cee7 /plugins/org.eclipse.net4j | |
parent | b48ccc46da60cf75aef219ae635c627fb49d3cf2 (diff) | |
download | cdo-a25531ae8a5c85586fdb590c0f806be3a807b634.tar.gz cdo-a25531ae8a5c85586fdb590c0f806be3a807b634.tar.xz cdo-a25531ae8a5c85586fdb590c0f806be3a807b634.zip |
[381472] Design a repository administration API
https://bugs.eclipse.org/bugs/show_bug.cgi?id=381472
Diffstat (limited to 'plugins/org.eclipse.net4j')
3 files changed, 1296 insertions, 1290 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java index 7dd1c77a4a..44b0ccc8f5 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java @@ -1,487 +1,491 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- * Andre Dietisheim - maintenance
- */
-package org.eclipse.spi.net4j;
-
-import org.eclipse.net4j.buffer.BufferState;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.buffer.IBufferHandler;
-import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.concurrent.IWorkSerializer;
-import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
-import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
-import org.eclipse.net4j.util.event.Event;
-import org.eclipse.net4j.util.event.IListener;
-import org.eclipse.net4j.util.lifecycle.Lifecycle;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.log.OMLogger;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import org.eclipse.spi.net4j.InternalChannel.SendQueueEvent.Type;
-
-import java.text.MessageFormat;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public class Channel extends Lifecycle implements InternalChannel
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
-
- private String userID;
-
- private InternalChannelMultiplexer channelMultiplexer;
-
- private short id = IBuffer.NO_CHANNEL;
-
- private ExecutorService receiveExecutor;
-
- /**
- * The external handler for buffers passed from the {@link #connector}.
- */
- private IBufferHandler receiveHandler;
-
- private IWorkSerializer receiveSerializer;
-
- private transient Queue<IBuffer> sendQueue;
-
- private transient long sentBuffers;
-
- private transient long receivedBuffers;
-
- public Channel()
- {
- }
-
- public String getUserID()
- {
- return userID;
- }
-
- public void setUserID(String userID)
- {
- this.userID = userID;
- }
-
- public Location getLocation()
- {
- return channelMultiplexer.getLocation();
- }
-
- public boolean isClient()
- {
- return channelMultiplexer.isClient();
- }
-
- public boolean isServer()
- {
- return channelMultiplexer.isServer();
- }
-
- public IChannelMultiplexer getMultiplexer()
- {
- return channelMultiplexer;
- }
-
- public void setMultiplexer(IChannelMultiplexer channelMultiplexer)
- {
- this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer;
- }
-
- public short getID()
- {
- return id;
- }
-
- public void setID(short id)
- {
- checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$
- this.id = id;
- }
-
- public ExecutorService getReceiveExecutor()
- {
- return receiveExecutor;
- }
-
- public void setReceiveExecutor(ExecutorService receiveExecutor)
- {
- this.receiveExecutor = receiveExecutor;
- }
-
- public IBufferHandler getReceiveHandler()
- {
- return receiveHandler;
- }
-
- public void setReceiveHandler(IBufferHandler receiveHandler)
- {
- this.receiveHandler = receiveHandler;
- }
-
- /**
- * @since 3.0
- */
- public long getSentBuffers()
- {
- return sentBuffers;
- }
-
- /**
- * @since 3.0
- */
- public long getReceivedBuffers()
- {
- return receivedBuffers;
- }
-
- public Queue<IBuffer> getSendQueue()
- {
- return sendQueue;
- }
-
- public void sendBuffer(IBuffer buffer)
- {
- handleBuffer(buffer);
- }
-
- /**
- * Handles the given buffer. Ensures it is in the PUTTING state (otherwise ignores it) and sends it on behalf of the
- * send queue.
- *
- * @see IBuffer#getState
- * @see BufferState#PUTTING
- * @see Channel#sendQueue
- */
- public void handleBuffer(IBuffer buffer)
- {
- BufferState state = buffer.getState();
- if (state != BufferState.PUTTING)
- {
- OM.LOG.warn("Ignoring buffer in state == " + state + ": " + this); //$NON-NLS-1$ //$NON-NLS-2$
- return;
- }
-
- if (TRACER.isEnabled())
- {
- TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$
- }
-
- if (sendQueue == null)
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Ignoring buffer because sendQueue == null: " + this); //$NON-NLS-1$
- }
-
- buffer.release();
- }
- else
- {
- sendQueue.add(buffer);
- ++sentBuffers;
- channelMultiplexer.multiplexChannel(this);
- }
- }
-
- /**
- * Handles a buffer sent by the multiplexer. Adds work to the receive queue or releases the buffer.
- *
- * @see InternalChannelMultiplexer#multiplexChannel
- * @see IWorkSerializer
- * @see ReceiverWork
- */
- public void handleBufferFromMultiplexer(IBuffer buffer)
- {
- if (receiveHandler != null)
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$
- }
-
- ++receivedBuffers;
- receiveSerializer.addWork(createReceiverWork(buffer));
- }
- else
- {
- // Shutting down
- buffer.release();
- }
- }
-
- protected ReceiverWork createReceiverWork(IBuffer buffer)
- {
- return new ReceiverWork(buffer);
- }
-
- public short getBufferCapacity()
- {
- return channelMultiplexer.getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- return channelMultiplexer.provideBuffer();
- }
-
- public void retainBuffer(IBuffer buffer)
- {
- channelMultiplexer.retainBuffer(buffer);
- }
-
- @Override
- public String toString()
- {
- if (receiveHandler instanceof IProtocol)
- {
- IProtocol<?> protocol = (IProtocol<?>)receiveHandler;
- return MessageFormat.format("Channel[{0}, {1}, {2}]", id, getLocation(), protocol.getType()); //$NON-NLS-1$
- }
-
- return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$
- checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
- sendQueue = new SendQueue();
- if (receiveExecutor == null)
- {
- receiveSerializer = new SynchronousWorkSerializer();
- }
- else
- {
- // CompletionWorkSerializer throws "One command already pending"
- // receiveSerializer = new CompletionWorkSerializer();
- // receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor);
- // receiveSerializer = new SynchronousWorkSerializer();
-
- class ChannelReceiveSerializer extends QueueWorkerWorkSerializer
- {
- @Override
- protected String getThreadName()
- {
- return "ReceiveSerializer-" + Channel.this; //$NON-NLS-1$
- }
- }
-
- receiveSerializer = new ChannelReceiveSerializer();
- }
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- unregisterFromMultiplexer();
- if (receiveSerializer != null)
- {
- receiveSerializer.dispose();
- receiveSerializer = null;
- }
-
- if (sendQueue != null)
- {
- sendQueue.clear();
- sendQueue = null;
- }
-
- super.doDeactivate();
- }
-
- protected void unregisterFromMultiplexer()
- {
- channelMultiplexer.closeChannel(this);
- }
-
- public void close()
- {
- LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
- }
-
- public boolean isClosed()
- {
- return !isActive();
- }
-
- /**
- * @author Eike Stepper
- */
- protected class ReceiverWork implements Runnable
- {
- private final IBuffer buffer;
-
- /**
- * @since 3.0
- */
- public ReceiverWork(IBuffer buffer)
- {
- this.buffer = buffer;
- }
-
- public void run()
- {
- IBufferHandler receiveHandler = getReceiveHandler();
- if (receiveHandler != null)
- {
- receiveHandler.handleBuffer(buffer);
- }
- else
- {
- // Shutting down
- buffer.release();
- }
- }
- }
-
- /**
- * A queue that holds buffers that shall be sent. This implementation notifies observers of enqueued and dequeued
- * buffers. The notification's deliberately not synchronized. It shall only be used by O&M tooling to offer (not 100%
- * accurate) statistical insights
- *
- * @author Eike Stepper
- * @since 3.0
- */
- protected class SendQueue extends ConcurrentLinkedQueue<IBuffer>
- {
- private static final long serialVersionUID = 1L;
-
- private AtomicInteger size = new AtomicInteger();
-
- protected SendQueue()
- {
- }
-
- @Override
- public boolean add(IBuffer o)
- {
- super.add(o);
- added();
- return true;
- }
-
- @Override
- public boolean offer(IBuffer o)
- {
- super.offer(o);
- added();
- return true;
- }
-
- @Override
- public IBuffer poll()
- {
- IBuffer result = super.poll();
- if (result != null)
- {
- removed();
- }
-
- return result;
- }
-
- @Override
- public IBuffer remove()
- {
- IBuffer result = super.remove();
- if (result != null)
- {
- removed();
- }
-
- return result;
- }
-
- @Override
- public boolean remove(Object o)
- {
- boolean result = super.remove(o);
- if (result)
- {
- removed();
- }
-
- return result;
- }
-
- private void added()
- {
- int queueSize = size.incrementAndGet();
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SendQueueEventImpl(Type.ENQUEUED, queueSize), listeners);
- }
- }
-
- private void removed()
- {
- int queueSize = size.decrementAndGet();
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SendQueueEventImpl(Type.DEQUEUED, queueSize), listeners);
- }
- }
- }
-
- /**
- * @author Eike Stepper
- */
- private final class SendQueueEventImpl extends Event implements SendQueueEvent
- {
- private static final long serialVersionUID = 1L;
-
- private Type type;
-
- private final int queueSize;
-
- private SendQueueEventImpl(Type type, int queueSize)
- {
- super(Channel.this);
- this.type = type;
- this.queueSize = queueSize;
- }
-
- @Override
- public InternalChannel getSource()
- {
- return (InternalChannel)super.getSource();
- }
-
- public Type getType()
- {
- return type;
- }
-
- public int getQueueSize()
- {
- return queueSize;
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + * Andre Dietisheim - maintenance + */ +package org.eclipse.spi.net4j; + +import org.eclipse.net4j.buffer.BufferState; +import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.net4j.buffer.IBufferHandler; +import org.eclipse.net4j.channel.IChannelMultiplexer; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.util.concurrent.IWorkSerializer; +import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer; +import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer; +import org.eclipse.net4j.util.event.Event; +import org.eclipse.net4j.util.event.IListener; +import org.eclipse.net4j.util.lifecycle.Lifecycle; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.log.OMLogger; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +import org.eclipse.spi.net4j.InternalChannel.SendQueueEvent.Type; + +import java.text.MessageFormat; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public class Channel extends Lifecycle implements InternalChannel +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class); + + private String userID; + + private InternalChannelMultiplexer channelMultiplexer; + + private short id = IBuffer.NO_CHANNEL; + + private ExecutorService receiveExecutor; + + /** + * The external handler for buffers passed from the {@link #connector}. + */ + private IBufferHandler receiveHandler; + + private IWorkSerializer receiveSerializer; + + private transient Queue<IBuffer> sendQueue; + + private transient long sentBuffers; + + private transient long receivedBuffers; + + public Channel() + { + } + + public String getUserID() + { + return userID; + } + + public void setUserID(String userID) + { + this.userID = userID; + } + + public Location getLocation() + { + return channelMultiplexer.getLocation(); + } + + public boolean isClient() + { + return channelMultiplexer.isClient(); + } + + public boolean isServer() + { + return channelMultiplexer.isServer(); + } + + public IChannelMultiplexer getMultiplexer() + { + return channelMultiplexer; + } + + public void setMultiplexer(IChannelMultiplexer channelMultiplexer) + { + this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer; + } + + public short getID() + { + return id; + } + + public void setID(short id) + { + checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$ + this.id = id; + } + + public ExecutorService getReceiveExecutor() + { + return receiveExecutor; + } + + public void setReceiveExecutor(ExecutorService receiveExecutor) + { + this.receiveExecutor = receiveExecutor; + } + + public IBufferHandler getReceiveHandler() + { + return receiveHandler; + } + + public void setReceiveHandler(IBufferHandler receiveHandler) + { + this.receiveHandler = receiveHandler; + } + + /** + * @since 3.0 + */ + public long getSentBuffers() + { + return sentBuffers; + } + + /** + * @since 3.0 + */ + public long getReceivedBuffers() + { + return receivedBuffers; + } + + public Queue<IBuffer> getSendQueue() + { + return sendQueue; + } + + public void sendBuffer(IBuffer buffer) + { + handleBuffer(buffer); + } + + /** + * Handles the given buffer. Ensures it is in the PUTTING state (otherwise ignores it) and sends it on behalf of the + * send queue. + * + * @see IBuffer#getState + * @see BufferState#PUTTING + * @see Channel#sendQueue + */ + public void handleBuffer(IBuffer buffer) + { + BufferState state = buffer.getState(); + if (state != BufferState.PUTTING) + { + OM.LOG.warn("Ignoring buffer in state == " + state + ": " + this); //$NON-NLS-1$ //$NON-NLS-2$ + return; + } + + if (TRACER.isEnabled()) + { + TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$ + } + + if (sendQueue == null) + { + if (TRACER.isEnabled()) + { + TRACER.trace("Ignoring buffer because sendQueue == null: " + this); //$NON-NLS-1$ + } + + buffer.release(); + } + else + { + sendQueue.add(buffer); + ++sentBuffers; + channelMultiplexer.multiplexChannel(this); + } + } + + /** + * Handles a buffer sent by the multiplexer. Adds work to the receive queue or releases the buffer. + * + * @see InternalChannelMultiplexer#multiplexChannel + * @see IWorkSerializer + * @see ReceiverWork + */ + public void handleBufferFromMultiplexer(IBuffer buffer) + { + if (receiveHandler != null) + { + if (TRACER.isEnabled()) + { + TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$ + } + + ++receivedBuffers; + receiveSerializer.addWork(createReceiverWork(buffer)); + } + else + { + // Shutting down + buffer.release(); + } + } + + protected ReceiverWork createReceiverWork(IBuffer buffer) + { + return new ReceiverWork(buffer); + } + + public short getBufferCapacity() + { + return channelMultiplexer.getBufferCapacity(); + } + + public IBuffer provideBuffer() + { + return channelMultiplexer.provideBuffer(); + } + + public void retainBuffer(IBuffer buffer) + { + channelMultiplexer.retainBuffer(buffer); + } + + @Override + public String toString() + { + if (receiveHandler instanceof IProtocol) + { + IProtocol<?> protocol = (IProtocol<?>)receiveHandler; + return MessageFormat.format("Channel[{0}, {1}, {2}]", id, getLocation(), protocol.getType()); //$NON-NLS-1$ + } + + return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$ + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$ + checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$ + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + sendQueue = new SendQueue(); + if (receiveExecutor == null) + { + receiveSerializer = new SynchronousWorkSerializer(); + } + else + { + receiveSerializer = new ReceiveSerializer(); + } + } + + @Override + protected void doDeactivate() throws Exception + { + unregisterFromMultiplexer(); + if (receiveSerializer != null) + { + receiveSerializer.dispose(); + receiveSerializer = null; + } + + if (sendQueue != null) + { + sendQueue.clear(); + sendQueue = null; + } + + super.doDeactivate(); + } + + protected void unregisterFromMultiplexer() + { + channelMultiplexer.closeChannel(this); + } + + public void close() + { + LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG); + } + + public boolean isClosed() + { + return !isActive(); + } + + /** + * @author Eike Stepper + * @since 4.1 + */ + protected class ReceiveSerializer extends QueueWorkerWorkSerializer + { + // CompletionWorkSerializer throws "One command already pending" + // CompletionWorkSerializer + // AsynchronousWorkSerializer + // SynchronousWorkSerializer + + @Override + protected String getThreadName() + { + return "ReceiveSerializer-" + Channel.this; //$NON-NLS-1$ + } + } + + /** + * @author Eike Stepper + */ + protected class ReceiverWork implements Runnable + { + private final IBuffer buffer; + + /** + * @since 3.0 + */ + public ReceiverWork(IBuffer buffer) + { + this.buffer = buffer; + } + + public void run() + { + IBufferHandler receiveHandler = getReceiveHandler(); + if (receiveHandler != null) + { + receiveHandler.handleBuffer(buffer); + } + else + { + // Shutting down + buffer.release(); + } + } + } + + /** + * A queue that holds buffers that shall be sent. This implementation notifies observers of enqueued and dequeued + * buffers. The notification's deliberately not synchronized. It shall only be used by O&M tooling to offer (not 100% + * accurate) statistical insights + * + * @author Eike Stepper + * @since 3.0 + */ + protected class SendQueue extends ConcurrentLinkedQueue<IBuffer> + { + private static final long serialVersionUID = 1L; + + private AtomicInteger size = new AtomicInteger(); + + protected SendQueue() + { + } + + @Override + public boolean add(IBuffer o) + { + super.add(o); + added(); + return true; + } + + @Override + public boolean offer(IBuffer o) + { + super.offer(o); + added(); + return true; + } + + @Override + public IBuffer poll() + { + IBuffer result = super.poll(); + if (result != null) + { + removed(); + } + + return result; + } + + @Override + public IBuffer remove() + { + IBuffer result = super.remove(); + if (result != null) + { + removed(); + } + + return result; + } + + @Override + public boolean remove(Object o) + { + boolean result = super.remove(o); + if (result) + { + removed(); + } + + return result; + } + + private void added() + { + int queueSize = size.incrementAndGet(); + IListener[] listeners = getListeners(); + if (listeners != null) + { + fireEvent(new SendQueueEventImpl(Type.ENQUEUED, queueSize), listeners); + } + } + + private void removed() + { + int queueSize = size.decrementAndGet(); + IListener[] listeners = getListeners(); + if (listeners != null) + { + fireEvent(new SendQueueEventImpl(Type.DEQUEUED, queueSize), listeners); + } + } + } + + /** + * @author Eike Stepper + */ + private final class SendQueueEventImpl extends Event implements SendQueueEvent + { + private static final long serialVersionUID = 1L; + + private Type type; + + private final int queueSize; + + private SendQueueEventImpl(Type type, int queueSize) + { + super(Channel.this); + this.type = type; + this.queueSize = queueSize; + } + + @Override + public InternalChannel getSource() + { + return (InternalChannel)super.getSource(); + } + + public Type getType() + { + return type; + } + + public int getQueueSize() + { + return queueSize; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java index 7187174de4..b3f0341f55 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java @@ -1,373 +1,372 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.spi.net4j;
-
-import org.eclipse.net4j.ITransportConfig;
-import org.eclipse.net4j.Net4jUtil;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.channel.ChannelException;
-import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.protocol.IProtocolProvider;
-import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
-import org.eclipse.net4j.util.StringUtil;
-import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
-import org.eclipse.net4j.util.container.Container;
-import org.eclipse.net4j.util.factory.FactoryKey;
-import org.eclipse.net4j.util.factory.IFactoryKey;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.security.INegotiationContext;
-
-import org.eclipse.internal.net4j.TransportConfig;
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
-
- private ITransportConfig config;
-
- private long openChannelTimeout = IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT;
-
- @ExcludeFromDump
- private transient ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>();
-
- @ExcludeFromDump
- private transient Set<Short> channelIDs = new HashSet<Short>();
-
- @ExcludeFromDump
- private transient int lastChannelID;
-
- public ChannelMultiplexer()
- {
- }
-
- public synchronized ITransportConfig getConfig()
- {
- if (config == null)
- {
- config = new TransportConfig(this);
- }
-
- return config;
- }
-
- public synchronized void setConfig(ITransportConfig config)
- {
- checkInactive();
- this.config = Net4jUtil.copyTransportConfig(this, config);
- }
-
- public long getOpenChannelTimeout()
- {
- if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT)
- {
- return OM.BUNDLE.getDebugSupport().getDebugOption("open.channel.timeout", 10000); //$NON-NLS-1$
- }
-
- return openChannelTimeout;
- }
-
- public void setOpenChannelTimeout(long openChannelTimeout)
- {
- this.openChannelTimeout = openChannelTimeout;
- }
-
- public final InternalChannel getChannel(short channelID)
- {
- return (InternalChannel)channels.get(channelID);
- }
-
- public final Collection<IChannel> getChannels()
- {
- return channels.values();
- }
-
- @Override
- public boolean isEmpty()
- {
- return channels.isEmpty();
- }
-
- public IChannel[] getElements()
- {
- List<IChannel> list = new ArrayList<IChannel>(getChannels());
- return list.toArray(new IChannel[list.size()]);
- }
-
- public InternalChannel openChannel() throws ChannelException
- {
- return openChannel((IProtocol<?>)null);
- }
-
- public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException
- {
- IProtocol<?> protocol = createProtocol(protocolID, infraStructure);
- if (protocol == null)
- {
- throw new IllegalArgumentException("Unknown protocolID: " + protocolID); //$NON-NLS-1$
- }
-
- return openChannel(protocol);
- }
-
- public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException
- {
- long start = System.currentTimeMillis();
- doBeforeOpenChannel(protocol);
-
- InternalChannel channel = createChannel();
- initChannel(channel, protocol);
- channel.setID(getNextChannelID());
- addChannel(channel);
-
- try
- {
- try
- {
- long timeout = getOpenChannelTimeout() - System.currentTimeMillis() + start;
- if (timeout <= 0)
- {
- throw new TimeoutRuntimeException();
- }
-
- registerChannelWithPeer(channel.getID(), timeout, protocol);
- }
- catch (TimeoutRuntimeException ex)
- {
- // Adjust the message for the complete timeout time
- String message = "Channel registration timeout after " + getOpenChannelTimeout() + " milliseconds";
- throw new TimeoutRuntimeException(message, ex);
- }
- }
- catch (ChannelException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ChannelException(ex);
- }
-
- return channel;
- }
-
- public InternalChannel inverseOpenChannel(short channelID, String protocolID)
- {
- IProtocol<?> protocol = createProtocol(protocolID, null);
-
- InternalChannel channel = createChannel();
- initChannel(channel, protocol);
- channel.setID(channelID);
- addChannel(channel);
- return channel;
- }
-
- public void closeChannel(InternalChannel channel) throws ChannelException
- {
- InternalChannel internalChannel = channel;
- deregisterChannelFromPeer(internalChannel);
- removeChannel(internalChannel);
- }
-
- public void inverseCloseChannel(short channelID) throws ChannelException
- {
- InternalChannel channel = getChannel(channelID);
- LifecycleUtil.deactivate(channel);
- }
-
- protected InternalChannel createChannel()
- {
- return new Channel();
- }
-
- protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
- {
- channel.setMultiplexer(this);
- channel.setReceiveExecutor(getConfig().getReceiveExecutor());
- if (protocol != null)
- {
- protocol.setChannel(channel);
- LifecycleUtil.activate(protocol);
- if (TRACER.isEnabled())
- {
- String protocolType = protocol.getType();
- TRACER.format("Opening channel with protocol {0}", protocolType); //$NON-NLS-1$
- }
-
- channel.setReceiveHandler(protocol);
- }
- else
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Opening channel without protocol"); //$NON-NLS-1$
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure)
- {
- if (StringUtil.isEmpty(type))
- {
- return null;
- }
-
- IProtocolProvider protocolProvider = getConfig().getProtocolProvider();
- if (protocolProvider == null)
- {
- throw new ChannelException("No protocol provider configured"); //$NON-NLS-1$
- }
-
- IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type);
- if (protocol == null)
- {
- throw new ChannelException("Invalid protocol factory: " + type); //$NON-NLS-1$
- }
-
- if (infraStructure != null)
- {
- protocol.setInfraStructure(infraStructure);
- }
-
- return protocol;
- }
-
- protected IFactoryKey createProtocolFactoryKey(String type)
- {
- switch (getLocation())
- {
- case SERVER:
- return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type);
- case CLIENT:
- return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type);
- default:
- throw new IllegalStateException();
- }
- }
-
- protected void doBeforeOpenChannel(IProtocol<?> protocol)
- {
- // Do nothing
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- IChannel[] channels;
- synchronized (channelIDs)
- {
- channels = getElements();
- }
-
- for (IChannel channel : channels)
- {
- LifecycleUtil.deactivate(channel);
- }
-
- synchronized (channelIDs)
- {
- this.channels.clear();
- }
-
- super.doDeactivate();
- }
-
- protected abstract INegotiationContext createNegotiationContext();
-
- protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol)
- throws ChannelException;
-
- protected abstract void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException;
-
- private short getNextChannelID()
- {
- synchronized (channelIDs)
- {
- int start = lastChannelID;
- int maxValue = Short.MAX_VALUE;
- for (;;)
- {
- ++lastChannelID;
- if (lastChannelID == start)
- {
- throw new ChannelException("Too many channels"); //$NON-NLS-1$
- }
-
- if (lastChannelID > maxValue)
- {
- lastChannelID = 1;
- }
-
- short id = (short)(isClient() ? lastChannelID : -lastChannelID);
- if (channelIDs.add(id))
- {
- return id;
- }
- }
- }
- }
-
- private void addChannel(InternalChannel channel)
- {
- short channelID = channel.getID();
- if (channelID == RESERVED_CHANNEL || channelID == IBuffer.NO_CHANNEL)
- {
- throw new ChannelException("Invalid channel ID: " + channelID); //$NON-NLS-1$
- }
-
- channels.put(channelID, channel);
- LifecycleUtil.activate(channel);
- fireElementAddedEvent(channel);
- }
-
- private void removeChannel(InternalChannel channel)
- {
- try
- {
- short channelID = channel.getID();
- boolean removed;
- synchronized (channelIDs)
- {
- removed = channels.remove(channelID) != null;
- if (removed)
- {
- channelIDs.remove(channelID);
- }
- }
-
- if (removed)
- {
- fireElementRemovedEvent(channel);
- }
- }
- catch (RuntimeException ex)
- {
- OM.LOG.error(ex);
- throw ex;
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.spi.net4j; + +import org.eclipse.net4j.ITransportConfig; +import org.eclipse.net4j.Net4jUtil; +import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.net4j.channel.ChannelException; +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.channel.IChannelMultiplexer; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.protocol.IProtocolProvider; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException; +import org.eclipse.net4j.util.container.Container; +import org.eclipse.net4j.util.factory.FactoryKey; +import org.eclipse.net4j.util.factory.IFactoryKey; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.security.INegotiationContext; + +import org.eclipse.internal.net4j.TransportConfig; +import org.eclipse.internal.net4j.bundle.OM; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class); + + private ITransportConfig config; + + private long openChannelTimeout = IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT; + + private ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>(); + + @ExcludeFromDump + private transient Set<Short> channelIDs = new HashSet<Short>(); + + @ExcludeFromDump + private transient int lastChannelID; + + public ChannelMultiplexer() + { + } + + public synchronized ITransportConfig getConfig() + { + if (config == null) + { + config = new TransportConfig(this); + } + + return config; + } + + public synchronized void setConfig(ITransportConfig config) + { + checkInactive(); + this.config = Net4jUtil.copyTransportConfig(this, config); + } + + public long getOpenChannelTimeout() + { + if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT) + { + return OM.BUNDLE.getDebugSupport().getDebugOption("open.channel.timeout", 10000); //$NON-NLS-1$ + } + + return openChannelTimeout; + } + + public void setOpenChannelTimeout(long openChannelTimeout) + { + this.openChannelTimeout = openChannelTimeout; + } + + public final InternalChannel getChannel(short channelID) + { + return (InternalChannel)channels.get(channelID); + } + + public final Collection<IChannel> getChannels() + { + return channels.values(); + } + + @Override + public boolean isEmpty() + { + return channels.isEmpty(); + } + + public IChannel[] getElements() + { + List<IChannel> list = new ArrayList<IChannel>(getChannels()); + return list.toArray(new IChannel[list.size()]); + } + + public InternalChannel openChannel() throws ChannelException + { + return openChannel((IProtocol<?>)null); + } + + public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException + { + IProtocol<?> protocol = createProtocol(protocolID, infraStructure); + if (protocol == null) + { + throw new IllegalArgumentException("Unknown protocolID: " + protocolID); //$NON-NLS-1$ + } + + return openChannel(protocol); + } + + public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException + { + long start = System.currentTimeMillis(); + doBeforeOpenChannel(protocol); + + InternalChannel channel = createChannel(); + initChannel(channel, protocol); + channel.setID(getNextChannelID()); + addChannel(channel); + + try + { + try + { + long timeout = getOpenChannelTimeout() - System.currentTimeMillis() + start; + if (timeout <= 0) + { + throw new TimeoutRuntimeException(); + } + + registerChannelWithPeer(channel.getID(), timeout, protocol); + } + catch (TimeoutRuntimeException ex) + { + // Adjust the message for the complete timeout time + String message = "Channel registration timeout after " + getOpenChannelTimeout() + " milliseconds"; + throw new TimeoutRuntimeException(message, ex); + } + } + catch (ChannelException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ChannelException(ex); + } + + return channel; + } + + public InternalChannel inverseOpenChannel(short channelID, String protocolID) + { + IProtocol<?> protocol = createProtocol(protocolID, null); + + InternalChannel channel = createChannel(); + initChannel(channel, protocol); + channel.setID(channelID); + addChannel(channel); + return channel; + } + + public void closeChannel(InternalChannel channel) throws ChannelException + { + InternalChannel internalChannel = channel; + deregisterChannelFromPeer(internalChannel); + removeChannel(internalChannel); + } + + public void inverseCloseChannel(short channelID) throws ChannelException + { + InternalChannel channel = getChannel(channelID); + LifecycleUtil.deactivate(channel); + } + + protected InternalChannel createChannel() + { + return new Channel(); + } + + protected void initChannel(InternalChannel channel, IProtocol<?> protocol) + { + channel.setMultiplexer(this); + channel.setReceiveExecutor(getConfig().getReceiveExecutor()); + if (protocol != null) + { + protocol.setChannel(channel); + LifecycleUtil.activate(protocol); + if (TRACER.isEnabled()) + { + String protocolType = protocol.getType(); + TRACER.format("Opening channel with protocol {0}", protocolType); //$NON-NLS-1$ + } + + channel.setReceiveHandler(protocol); + } + else + { + if (TRACER.isEnabled()) + { + TRACER.trace("Opening channel without protocol"); //$NON-NLS-1$ + } + } + } + + @SuppressWarnings("unchecked") + protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure) + { + if (StringUtil.isEmpty(type)) + { + return null; + } + + IProtocolProvider protocolProvider = getConfig().getProtocolProvider(); + if (protocolProvider == null) + { + throw new ChannelException("No protocol provider configured"); //$NON-NLS-1$ + } + + IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type); + if (protocol == null) + { + throw new ChannelException("Invalid protocol factory: " + type); //$NON-NLS-1$ + } + + if (infraStructure != null) + { + protocol.setInfraStructure(infraStructure); + } + + return protocol; + } + + protected IFactoryKey createProtocolFactoryKey(String type) + { + switch (getLocation()) + { + case SERVER: + return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type); + case CLIENT: + return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type); + default: + throw new IllegalStateException(); + } + } + + protected void doBeforeOpenChannel(IProtocol<?> protocol) + { + // Do nothing + } + + @Override + protected void doDeactivate() throws Exception + { + IChannel[] channels; + synchronized (channelIDs) + { + channels = getElements(); + } + + for (IChannel channel : channels) + { + LifecycleUtil.deactivate(channel); + } + + synchronized (channelIDs) + { + this.channels.clear(); + } + + super.doDeactivate(); + } + + protected abstract INegotiationContext createNegotiationContext(); + + protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol) + throws ChannelException; + + protected abstract void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException; + + private short getNextChannelID() + { + synchronized (channelIDs) + { + int start = lastChannelID; + int maxValue = Short.MAX_VALUE; + for (;;) + { + ++lastChannelID; + if (lastChannelID == start) + { + throw new ChannelException("Too many channels"); //$NON-NLS-1$ + } + + if (lastChannelID > maxValue) + { + lastChannelID = 1; + } + + short id = (short)(isClient() ? lastChannelID : -lastChannelID); + if (channelIDs.add(id)) + { + return id; + } + } + } + } + + private void addChannel(InternalChannel channel) + { + short channelID = channel.getID(); + if (channelID == RESERVED_CHANNEL || channelID == IBuffer.NO_CHANNEL) + { + throw new ChannelException("Invalid channel ID: " + channelID); //$NON-NLS-1$ + } + + channels.put(channelID, channel); + LifecycleUtil.activate(channel); + fireElementAddedEvent(channel); + } + + private void removeChannel(InternalChannel channel) + { + try + { + short channelID = channel.getID(); + boolean removed; + synchronized (channelIDs) + { + removed = channels.remove(channelID) != null; + if (removed) + { + channelIDs.remove(channelID); + } + } + + if (removed) + { + fireElementRemovedEvent(channel); + } + } + catch (RuntimeException ex) + { + OM.LOG.error(ex); + throw ex; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java index 424e580e2e..c33de7bdc9 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java @@ -1,430 +1,433 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.spi.net4j;
-
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.channel.ChannelException;
-import org.eclipse.net4j.connector.ConnectorException;
-import org.eclipse.net4j.connector.ConnectorState;
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.connector.IConnectorStateEvent;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
-import org.eclipse.net4j.util.event.Event;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.log.OMLogger;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.security.INegotiationContext;
-import org.eclipse.net4j.util.security.INegotiator;
-import org.eclipse.net4j.util.security.NegotiationException;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.text.MessageFormat;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public abstract class Connector extends ChannelMultiplexer implements InternalConnector
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class);
-
- private String userID;
-
- private transient ConnectorState connectorState = ConnectorState.DISCONNECTED;
-
- @ExcludeFromDump
- private transient CountDownLatch finishedConnecting;
-
- @ExcludeFromDump
- private transient CountDownLatch finishedNegotiating;
-
- @ExcludeFromDump
- private transient INegotiationContext negotiationContext;
-
- @ExcludeFromDump
- private transient NegotiationException negotiationException;
-
- public Connector()
- {
- }
-
- public INegotiator getNegotiator()
- {
- return getConfig().getNegotiator();
- }
-
- public void setNegotiator(INegotiator negotiator)
- {
- getConfig().setNegotiator(negotiator);
- }
-
- public INegotiationContext getNegotiationContext()
- {
- return negotiationContext;
- }
-
- public boolean isClient()
- {
- return getLocation() == Location.CLIENT;
- }
-
- public boolean isServer()
- {
- return getLocation() == Location.SERVER;
- }
-
- public String getUserID()
- {
- return userID;
- }
-
- public void setUserID(String userID)
- {
- checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected"); //$NON-NLS-1$
- if (TRACER.isEnabled())
- {
- TRACER.format("Setting userID {0} for {1}", userID, this); //$NON-NLS-1$
- }
-
- this.userID = userID;
- }
-
- public ConnectorState getState()
- {
- return connectorState;
- }
-
- public void setState(ConnectorState newState) throws ConnectorException
- {
- ConnectorState oldState = getState();
- if (newState != oldState)
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this); //$NON-NLS-1$
- }
-
- connectorState = newState;
- switch (newState)
- {
- case DISCONNECTED:
- if (finishedConnecting != null)
- {
- finishedConnecting.countDown();
- finishedConnecting = null;
- }
-
- if (finishedNegotiating != null)
- {
- finishedNegotiating.countDown();
- finishedNegotiating = null;
- }
-
- break;
-
- case CONNECTING:
- finishedConnecting = new CountDownLatch(1);
- finishedNegotiating = new CountDownLatch(1);
- // The concrete implementation must advance state to NEGOTIATING or CONNECTED
- break;
-
- case NEGOTIATING:
- finishedConnecting.countDown();
- negotiationContext = createNegotiationContext();
- getNegotiator().negotiate(negotiationContext);
- break;
-
- case CONNECTED:
- negotiationContext = null;
- deferredActivate(true);
- finishedConnecting.countDown();
- finishedNegotiating.countDown();
- break;
- }
-
- fireEvent(new ConnectorStateEvent(this, oldState, newState));
- }
- }
-
- public boolean isDisconnected()
- {
- return connectorState == ConnectorState.DISCONNECTED;
- }
-
- public boolean isConnecting()
- {
- return connectorState == ConnectorState.CONNECTING;
- }
-
- public boolean isNegotiating()
- {
- return connectorState == ConnectorState.NEGOTIATING;
- }
-
- public boolean isConnected()
- {
- if (negotiationException != null)
- {
- throw new ConnectorException("Connector negotiation failed", negotiationException); //$NON-NLS-1$
- }
-
- return connectorState == ConnectorState.CONNECTED;
- }
-
- public void connectAsync() throws ConnectorException
- {
- try
- {
- activate();
- }
- catch (ConnectorException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ConnectorException(ex);
- }
- }
-
- /**
- * @since 4.0
- */
- public void waitForConnection(long timeout) throws ConnectorException
- {
- String message = "Connection timeout after " + timeout + " milliseconds";
- final long MAX_POLL_INTERVAL = 100L;
- boolean withTimeout = timeout != NO_TIMEOUT;
-
- try
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Waiting for connection..."); //$NON-NLS-1$
- }
-
- for (;;)
- {
- long t = MAX_POLL_INTERVAL;
- if (withTimeout)
- {
- t = Math.min(MAX_POLL_INTERVAL, timeout);
- timeout -= MAX_POLL_INTERVAL;
- }
-
- if (t <= 0)
- {
- break;
- }
-
- if (finishedNegotiating == null)
- {
- break;
- }
-
- if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS))
- {
- break;
- }
- }
-
- if (!isConnected())
- {
- throw new ConnectorException(message);
- }
- }
- catch (ConnectorException ex)
- {
- setState(ConnectorState.DISCONNECTED);
- throw ex;
- }
- catch (Exception ex)
- {
- setState(ConnectorState.DISCONNECTED);
- throw new ConnectorException(ex);
- }
- }
-
- /**
- * @since 4.0
- */
- public void connect(long timeout) throws ConnectorException
- {
- connectAsync();
- waitForConnection(timeout);
- }
-
- /**
- * @since 4.0
- */
- public void connect() throws ConnectorException
- {
- connect(NO_TIMEOUT);
- }
-
- public void close()
- {
- LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
- }
-
- public boolean isClosed()
- {
- return !isActive();
- }
-
- public short getBufferCapacity()
- {
- return getConfig().getBufferProvider().getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- return getConfig().getBufferProvider().provideBuffer();
- }
-
- public void retainBuffer(IBuffer buffer)
- {
- getConfig().getBufferProvider().retainBuffer(buffer);
- }
-
- protected void leaveConnecting()
- {
- if (getNegotiator() == null)
- {
- setState(ConnectorState.CONNECTED);
- }
- else
- {
- setState(ConnectorState.NEGOTIATING);
- }
- }
-
- @Override
- protected abstract INegotiationContext createNegotiationContext();
-
- protected NegotiationException getNegotiationException()
- {
- return negotiationException;
- }
-
- protected void setNegotiationException(NegotiationException negotiationException)
- {
- this.negotiationException = negotiationException;
- }
-
- @Override
- protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
- {
- super.initChannel(channel, protocol);
- channel.setUserID(getUserID());
- }
-
- @Override
- protected void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException
- {
- }
-
- public Location getLocation()
- {
- return null;
- }
-
- public String getURL()
- {
- return null;
- }
-
- @Override
- protected boolean isDeferredActivation()
- {
- return true;
- }
-
- @Override
- protected void doBeforeOpenChannel(IProtocol<?> protocol)
- {
- super.doBeforeOpenChannel(protocol);
- long timeout = getOpenChannelTimeout();
- waitForConnection(timeout);
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(getConfig().getBufferProvider(), "getConfig().getBufferProvider()"); //$NON-NLS-1$
-
- if (userID != null && getConfig().getNegotiator() == null)
- {
- throw new IllegalStateException("A user ID on this connector requires a negotiator"); //$NON-NLS-1$
- }
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
- setState(ConnectorState.CONNECTING);
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- setState(ConnectorState.DISCONNECTED);
- super.doDeactivate();
- }
-
- /**
- * @author Eike Stepper
- */
- private static class ConnectorStateEvent extends Event implements IConnectorStateEvent
- {
- private static final long serialVersionUID = 1L;
-
- private ConnectorState oldState;
-
- private ConnectorState newState;
-
- public ConnectorStateEvent(IConnector source, ConnectorState oldState, ConnectorState newState)
- {
- super(source);
- this.oldState = oldState;
- this.newState = newState;
- }
-
- @Override
- public IConnector getSource()
- {
- return (IConnector)super.getSource();
- }
-
- public ConnectorState getOldState()
- {
- return oldState;
- }
-
- public ConnectorState getNewState()
- {
- return newState;
- }
-
- @Override
- public String toString()
- {
- return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(), //$NON-NLS-1$
- getOldState(), getNewState());
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.spi.net4j; + +import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.net4j.channel.ChannelException; +import org.eclipse.net4j.connector.ConnectorException; +import org.eclipse.net4j.connector.ConnectorState; +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.connector.IConnectorStateEvent; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.event.Event; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.log.OMLogger; +import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.security.INegotiationContext; +import org.eclipse.net4j.util.security.INegotiator; +import org.eclipse.net4j.util.security.NegotiationException; + +import org.eclipse.internal.net4j.bundle.OM; + +import java.text.MessageFormat; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public abstract class Connector extends ChannelMultiplexer implements InternalConnector +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class); + + private String userID; + + private transient ConnectorState connectorState = ConnectorState.DISCONNECTED; + + @ExcludeFromDump + private transient CountDownLatch finishedConnecting; + + @ExcludeFromDump + private transient CountDownLatch finishedNegotiating; + + @ExcludeFromDump + private transient INegotiationContext negotiationContext; + + @ExcludeFromDump + private transient NegotiationException negotiationException; + + public Connector() + { + } + + public INegotiator getNegotiator() + { + return getConfig().getNegotiator(); + } + + public void setNegotiator(INegotiator negotiator) + { + getConfig().setNegotiator(negotiator); + } + + public INegotiationContext getNegotiationContext() + { + return negotiationContext; + } + + public boolean isClient() + { + return getLocation() == Location.CLIENT; + } + + public boolean isServer() + { + return getLocation() == Location.SERVER; + } + + public String getUserID() + { + return userID; + } + + public void setUserID(String userID) + { + checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected"); //$NON-NLS-1$ + if (TRACER.isEnabled()) + { + TRACER.format("Setting userID {0} for {1}", userID, this); //$NON-NLS-1$ + } + + this.userID = userID; + } + + public ConnectorState getState() + { + return connectorState; + } + + public void setState(ConnectorState newState) throws ConnectorException + { + ConnectorState oldState = getState(); + if (newState != oldState) + { + if (TRACER.isEnabled()) + { + TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this); //$NON-NLS-1$ + } + + connectorState = newState; + switch (newState) + { + case DISCONNECTED: + if (finishedConnecting != null) + { + finishedConnecting.countDown(); + finishedConnecting = null; + } + + if (finishedNegotiating != null) + { + finishedNegotiating.countDown(); + finishedNegotiating = null; + } + + break; + + case CONNECTING: + finishedConnecting = new CountDownLatch(1); + finishedNegotiating = new CountDownLatch(1); + // The concrete implementation must advance state to NEGOTIATING or CONNECTED + break; + + case NEGOTIATING: + finishedConnecting.countDown(); + negotiationContext = createNegotiationContext(); + getNegotiator().negotiate(negotiationContext); + break; + + case CONNECTED: + negotiationContext = null; + deferredActivate(true); + finishedConnecting.countDown(); + finishedNegotiating.countDown(); + break; + } + + fireEvent(new ConnectorStateEvent(this, oldState, newState)); + } + } + + public boolean isDisconnected() + { + return connectorState == ConnectorState.DISCONNECTED; + } + + public boolean isConnecting() + { + return connectorState == ConnectorState.CONNECTING; + } + + public boolean isNegotiating() + { + return connectorState == ConnectorState.NEGOTIATING; + } + + public boolean isConnected() + { + if (negotiationException != null) + { + throw new ConnectorException("Connector negotiation failed", negotiationException); //$NON-NLS-1$ + } + + return connectorState == ConnectorState.CONNECTED; + } + + public void connectAsync() throws ConnectorException + { + try + { + activate(); + } + catch (ConnectorException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ConnectorException(ex); + } + } + + /** + * @since 4.0 + */ + public void waitForConnection(long timeout) throws ConnectorException + { + String message = "Connection timeout after " + timeout + " milliseconds"; + final long MAX_POLL_INTERVAL = 100L; + boolean withTimeout = timeout != NO_TIMEOUT; + + try + { + if (TRACER.isEnabled()) + { + TRACER.trace("Waiting for connection..."); //$NON-NLS-1$ + } + + for (;;) + { + long t = MAX_POLL_INTERVAL; + if (withTimeout) + { + t = Math.min(MAX_POLL_INTERVAL, timeout); + timeout -= MAX_POLL_INTERVAL; + } + + if (t <= 0) + { + break; + } + + if (finishedNegotiating == null) + { + break; + } + + if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS)) + { + break; + } + } + + if (!isConnected()) + { + throw new ConnectorException(message); + } + } + catch (ConnectorException ex) + { + setState(ConnectorState.DISCONNECTED); + throw ex; + } + catch (Exception ex) + { + setState(ConnectorState.DISCONNECTED); + throw new ConnectorException(ex); + } + } + + /** + * @since 4.0 + */ + public void connect(long timeout) throws ConnectorException + { + connectAsync(); + waitForConnection(timeout); + } + + /** + * @since 4.0 + */ + public void connect() throws ConnectorException + { + connect(NO_TIMEOUT); + } + + public void close() + { + LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG); + } + + public boolean isClosed() + { + return !isActive(); + } + + public short getBufferCapacity() + { + return getConfig().getBufferProvider().getBufferCapacity(); + } + + public IBuffer provideBuffer() + { + return getConfig().getBufferProvider().provideBuffer(); + } + + public void retainBuffer(IBuffer buffer) + { + getConfig().getBufferProvider().retainBuffer(buffer); + } + + protected void leaveConnecting() + { + if (getNegotiator() == null) + { + setState(ConnectorState.CONNECTED); + } + else + { + setState(ConnectorState.NEGOTIATING); + } + } + + @Override + protected abstract INegotiationContext createNegotiationContext(); + + protected NegotiationException getNegotiationException() + { + return negotiationException; + } + + protected void setNegotiationException(NegotiationException negotiationException) + { + this.negotiationException = negotiationException; + } + + @Override + protected void initChannel(InternalChannel channel, IProtocol<?> protocol) + { + super.initChannel(channel, protocol); + channel.setUserID(getUserID()); + } + + @Override + protected void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException + { + } + + public Location getLocation() + { + return null; + } + + public String getURL() + { + return null; + } + + /** + * @since 4.1 + */ + @Override + public boolean isDeferredActivation() + { + return true; + } + + @Override + protected void doBeforeOpenChannel(IProtocol<?> protocol) + { + super.doBeforeOpenChannel(protocol); + long timeout = getOpenChannelTimeout(); + waitForConnection(timeout); + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(getConfig().getBufferProvider(), "getConfig().getBufferProvider()"); //$NON-NLS-1$ + + if (userID != null && getConfig().getNegotiator() == null) + { + throw new IllegalStateException("A user ID on this connector requires a negotiator"); //$NON-NLS-1$ + } + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + setState(ConnectorState.CONNECTING); + } + + @Override + protected void doDeactivate() throws Exception + { + setState(ConnectorState.DISCONNECTED); + super.doDeactivate(); + } + + /** + * @author Eike Stepper + */ + private static class ConnectorStateEvent extends Event implements IConnectorStateEvent + { + private static final long serialVersionUID = 1L; + + private ConnectorState oldState; + + private ConnectorState newState; + + public ConnectorStateEvent(IConnector source, ConnectorState oldState, ConnectorState newState) + { + super(source); + this.oldState = oldState; + this.newState = newState; + } + + @Override + public IConnector getSource() + { + return (IConnector)super.getSource(); + } + + public ConnectorState getOldState() + { + return oldState; + } + + public ConnectorState getNewState() + { + return newState; + } + + @Override + public String toString() + { + return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(), //$NON-NLS-1$ + getOldState(), getNewState()); + } + } +} |