Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2012-06-07 10:14:51 +0000
committerEike Stepper2012-06-07 10:14:51 +0000
commita25531ae8a5c85586fdb590c0f806be3a807b634 (patch)
tree7849f8065be919611dabd87403f9bf5a1743cee7 /plugins/org.eclipse.net4j
parentb48ccc46da60cf75aef219ae635c627fb49d3cf2 (diff)
downloadcdo-a25531ae8a5c85586fdb590c0f806be3a807b634.tar.gz
cdo-a25531ae8a5c85586fdb590c0f806be3a807b634.tar.xz
cdo-a25531ae8a5c85586fdb590c0f806be3a807b634.zip
[381472] Design a repository administration API
https://bugs.eclipse.org/bugs/show_bug.cgi?id=381472
Diffstat (limited to 'plugins/org.eclipse.net4j')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java978
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java745
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java863
3 files changed, 1296 insertions, 1290 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
index 7dd1c77a4a..44b0ccc8f5 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
@@ -1,487 +1,491 @@
-/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- * Andre Dietisheim - maintenance
- */
-package org.eclipse.spi.net4j;
-
-import org.eclipse.net4j.buffer.BufferState;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.buffer.IBufferHandler;
-import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.concurrent.IWorkSerializer;
-import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
-import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
-import org.eclipse.net4j.util.event.Event;
-import org.eclipse.net4j.util.event.IListener;
-import org.eclipse.net4j.util.lifecycle.Lifecycle;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.log.OMLogger;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import org.eclipse.spi.net4j.InternalChannel.SendQueueEvent.Type;
-
-import java.text.MessageFormat;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public class Channel extends Lifecycle implements InternalChannel
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
-
- private String userID;
-
- private InternalChannelMultiplexer channelMultiplexer;
-
- private short id = IBuffer.NO_CHANNEL;
-
- private ExecutorService receiveExecutor;
-
- /**
- * The external handler for buffers passed from the {@link #connector}.
- */
- private IBufferHandler receiveHandler;
-
- private IWorkSerializer receiveSerializer;
-
- private transient Queue<IBuffer> sendQueue;
-
- private transient long sentBuffers;
-
- private transient long receivedBuffers;
-
- public Channel()
- {
- }
-
- public String getUserID()
- {
- return userID;
- }
-
- public void setUserID(String userID)
- {
- this.userID = userID;
- }
-
- public Location getLocation()
- {
- return channelMultiplexer.getLocation();
- }
-
- public boolean isClient()
- {
- return channelMultiplexer.isClient();
- }
-
- public boolean isServer()
- {
- return channelMultiplexer.isServer();
- }
-
- public IChannelMultiplexer getMultiplexer()
- {
- return channelMultiplexer;
- }
-
- public void setMultiplexer(IChannelMultiplexer channelMultiplexer)
- {
- this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer;
- }
-
- public short getID()
- {
- return id;
- }
-
- public void setID(short id)
- {
- checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$
- this.id = id;
- }
-
- public ExecutorService getReceiveExecutor()
- {
- return receiveExecutor;
- }
-
- public void setReceiveExecutor(ExecutorService receiveExecutor)
- {
- this.receiveExecutor = receiveExecutor;
- }
-
- public IBufferHandler getReceiveHandler()
- {
- return receiveHandler;
- }
-
- public void setReceiveHandler(IBufferHandler receiveHandler)
- {
- this.receiveHandler = receiveHandler;
- }
-
- /**
- * @since 3.0
- */
- public long getSentBuffers()
- {
- return sentBuffers;
- }
-
- /**
- * @since 3.0
- */
- public long getReceivedBuffers()
- {
- return receivedBuffers;
- }
-
- public Queue<IBuffer> getSendQueue()
- {
- return sendQueue;
- }
-
- public void sendBuffer(IBuffer buffer)
- {
- handleBuffer(buffer);
- }
-
- /**
- * Handles the given buffer. Ensures it is in the PUTTING state (otherwise ignores it) and sends it on behalf of the
- * send queue.
- *
- * @see IBuffer#getState
- * @see BufferState#PUTTING
- * @see Channel#sendQueue
- */
- public void handleBuffer(IBuffer buffer)
- {
- BufferState state = buffer.getState();
- if (state != BufferState.PUTTING)
- {
- OM.LOG.warn("Ignoring buffer in state == " + state + ": " + this); //$NON-NLS-1$ //$NON-NLS-2$
- return;
- }
-
- if (TRACER.isEnabled())
- {
- TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$
- }
-
- if (sendQueue == null)
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Ignoring buffer because sendQueue == null: " + this); //$NON-NLS-1$
- }
-
- buffer.release();
- }
- else
- {
- sendQueue.add(buffer);
- ++sentBuffers;
- channelMultiplexer.multiplexChannel(this);
- }
- }
-
- /**
- * Handles a buffer sent by the multiplexer. Adds work to the receive queue or releases the buffer.
- *
- * @see InternalChannelMultiplexer#multiplexChannel
- * @see IWorkSerializer
- * @see ReceiverWork
- */
- public void handleBufferFromMultiplexer(IBuffer buffer)
- {
- if (receiveHandler != null)
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$
- }
-
- ++receivedBuffers;
- receiveSerializer.addWork(createReceiverWork(buffer));
- }
- else
- {
- // Shutting down
- buffer.release();
- }
- }
-
- protected ReceiverWork createReceiverWork(IBuffer buffer)
- {
- return new ReceiverWork(buffer);
- }
-
- public short getBufferCapacity()
- {
- return channelMultiplexer.getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- return channelMultiplexer.provideBuffer();
- }
-
- public void retainBuffer(IBuffer buffer)
- {
- channelMultiplexer.retainBuffer(buffer);
- }
-
- @Override
- public String toString()
- {
- if (receiveHandler instanceof IProtocol)
- {
- IProtocol<?> protocol = (IProtocol<?>)receiveHandler;
- return MessageFormat.format("Channel[{0}, {1}, {2}]", id, getLocation(), protocol.getType()); //$NON-NLS-1$
- }
-
- return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$
- checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
- sendQueue = new SendQueue();
- if (receiveExecutor == null)
- {
- receiveSerializer = new SynchronousWorkSerializer();
- }
- else
- {
- // CompletionWorkSerializer throws "One command already pending"
- // receiveSerializer = new CompletionWorkSerializer();
- // receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor);
- // receiveSerializer = new SynchronousWorkSerializer();
-
- class ChannelReceiveSerializer extends QueueWorkerWorkSerializer
- {
- @Override
- protected String getThreadName()
- {
- return "ReceiveSerializer-" + Channel.this; //$NON-NLS-1$
- }
- }
-
- receiveSerializer = new ChannelReceiveSerializer();
- }
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- unregisterFromMultiplexer();
- if (receiveSerializer != null)
- {
- receiveSerializer.dispose();
- receiveSerializer = null;
- }
-
- if (sendQueue != null)
- {
- sendQueue.clear();
- sendQueue = null;
- }
-
- super.doDeactivate();
- }
-
- protected void unregisterFromMultiplexer()
- {
- channelMultiplexer.closeChannel(this);
- }
-
- public void close()
- {
- LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
- }
-
- public boolean isClosed()
- {
- return !isActive();
- }
-
- /**
- * @author Eike Stepper
- */
- protected class ReceiverWork implements Runnable
- {
- private final IBuffer buffer;
-
- /**
- * @since 3.0
- */
- public ReceiverWork(IBuffer buffer)
- {
- this.buffer = buffer;
- }
-
- public void run()
- {
- IBufferHandler receiveHandler = getReceiveHandler();
- if (receiveHandler != null)
- {
- receiveHandler.handleBuffer(buffer);
- }
- else
- {
- // Shutting down
- buffer.release();
- }
- }
- }
-
- /**
- * A queue that holds buffers that shall be sent. This implementation notifies observers of enqueued and dequeued
- * buffers. The notification's deliberately not synchronized. It shall only be used by O&M tooling to offer (not 100%
- * accurate) statistical insights
- *
- * @author Eike Stepper
- * @since 3.0
- */
- protected class SendQueue extends ConcurrentLinkedQueue<IBuffer>
- {
- private static final long serialVersionUID = 1L;
-
- private AtomicInteger size = new AtomicInteger();
-
- protected SendQueue()
- {
- }
-
- @Override
- public boolean add(IBuffer o)
- {
- super.add(o);
- added();
- return true;
- }
-
- @Override
- public boolean offer(IBuffer o)
- {
- super.offer(o);
- added();
- return true;
- }
-
- @Override
- public IBuffer poll()
- {
- IBuffer result = super.poll();
- if (result != null)
- {
- removed();
- }
-
- return result;
- }
-
- @Override
- public IBuffer remove()
- {
- IBuffer result = super.remove();
- if (result != null)
- {
- removed();
- }
-
- return result;
- }
-
- @Override
- public boolean remove(Object o)
- {
- boolean result = super.remove(o);
- if (result)
- {
- removed();
- }
-
- return result;
- }
-
- private void added()
- {
- int queueSize = size.incrementAndGet();
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SendQueueEventImpl(Type.ENQUEUED, queueSize), listeners);
- }
- }
-
- private void removed()
- {
- int queueSize = size.decrementAndGet();
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SendQueueEventImpl(Type.DEQUEUED, queueSize), listeners);
- }
- }
- }
-
- /**
- * @author Eike Stepper
- */
- private final class SendQueueEventImpl extends Event implements SendQueueEvent
- {
- private static final long serialVersionUID = 1L;
-
- private Type type;
-
- private final int queueSize;
-
- private SendQueueEventImpl(Type type, int queueSize)
- {
- super(Channel.this);
- this.type = type;
- this.queueSize = queueSize;
- }
-
- @Override
- public InternalChannel getSource()
- {
- return (InternalChannel)super.getSource();
- }
-
- public Type getType()
- {
- return type;
- }
-
- public int getQueueSize()
- {
- return queueSize;
- }
- }
-}
+/*
+ * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ * Andre Dietisheim - maintenance
+ */
+package org.eclipse.spi.net4j;
+
+import org.eclipse.net4j.buffer.BufferState;
+import org.eclipse.net4j.buffer.IBuffer;
+import org.eclipse.net4j.buffer.IBufferHandler;
+import org.eclipse.net4j.channel.IChannelMultiplexer;
+import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.util.concurrent.IWorkSerializer;
+import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
+import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
+import org.eclipse.net4j.util.event.Event;
+import org.eclipse.net4j.util.event.IListener;
+import org.eclipse.net4j.util.lifecycle.Lifecycle;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.log.OMLogger;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+import org.eclipse.spi.net4j.InternalChannel.SendQueueEvent.Type;
+
+import java.text.MessageFormat;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public class Channel extends Lifecycle implements InternalChannel
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
+
+ private String userID;
+
+ private InternalChannelMultiplexer channelMultiplexer;
+
+ private short id = IBuffer.NO_CHANNEL;
+
+ private ExecutorService receiveExecutor;
+
+ /**
+ * The external handler for buffers passed from the {@link #connector}.
+ */
+ private IBufferHandler receiveHandler;
+
+ private IWorkSerializer receiveSerializer;
+
+ private transient Queue<IBuffer> sendQueue;
+
+ private transient long sentBuffers;
+
+ private transient long receivedBuffers;
+
+ public Channel()
+ {
+ }
+
+ public String getUserID()
+ {
+ return userID;
+ }
+
+ public void setUserID(String userID)
+ {
+ this.userID = userID;
+ }
+
+ public Location getLocation()
+ {
+ return channelMultiplexer.getLocation();
+ }
+
+ public boolean isClient()
+ {
+ return channelMultiplexer.isClient();
+ }
+
+ public boolean isServer()
+ {
+ return channelMultiplexer.isServer();
+ }
+
+ public IChannelMultiplexer getMultiplexer()
+ {
+ return channelMultiplexer;
+ }
+
+ public void setMultiplexer(IChannelMultiplexer channelMultiplexer)
+ {
+ this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer;
+ }
+
+ public short getID()
+ {
+ return id;
+ }
+
+ public void setID(short id)
+ {
+ checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$
+ this.id = id;
+ }
+
+ public ExecutorService getReceiveExecutor()
+ {
+ return receiveExecutor;
+ }
+
+ public void setReceiveExecutor(ExecutorService receiveExecutor)
+ {
+ this.receiveExecutor = receiveExecutor;
+ }
+
+ public IBufferHandler getReceiveHandler()
+ {
+ return receiveHandler;
+ }
+
+ public void setReceiveHandler(IBufferHandler receiveHandler)
+ {
+ this.receiveHandler = receiveHandler;
+ }
+
+ /**
+ * @since 3.0
+ */
+ public long getSentBuffers()
+ {
+ return sentBuffers;
+ }
+
+ /**
+ * @since 3.0
+ */
+ public long getReceivedBuffers()
+ {
+ return receivedBuffers;
+ }
+
+ public Queue<IBuffer> getSendQueue()
+ {
+ return sendQueue;
+ }
+
+ public void sendBuffer(IBuffer buffer)
+ {
+ handleBuffer(buffer);
+ }
+
+ /**
+ * Handles the given buffer. Ensures it is in the PUTTING state (otherwise ignores it) and sends it on behalf of the
+ * send queue.
+ *
+ * @see IBuffer#getState
+ * @see BufferState#PUTTING
+ * @see Channel#sendQueue
+ */
+ public void handleBuffer(IBuffer buffer)
+ {
+ BufferState state = buffer.getState();
+ if (state != BufferState.PUTTING)
+ {
+ OM.LOG.warn("Ignoring buffer in state == " + state + ": " + this); //$NON-NLS-1$ //$NON-NLS-2$
+ return;
+ }
+
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$
+ }
+
+ if (sendQueue == null)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Ignoring buffer because sendQueue == null: " + this); //$NON-NLS-1$
+ }
+
+ buffer.release();
+ }
+ else
+ {
+ sendQueue.add(buffer);
+ ++sentBuffers;
+ channelMultiplexer.multiplexChannel(this);
+ }
+ }
+
+ /**
+ * Handles a buffer sent by the multiplexer. Adds work to the receive queue or releases the buffer.
+ *
+ * @see InternalChannelMultiplexer#multiplexChannel
+ * @see IWorkSerializer
+ * @see ReceiverWork
+ */
+ public void handleBufferFromMultiplexer(IBuffer buffer)
+ {
+ if (receiveHandler != null)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$
+ }
+
+ ++receivedBuffers;
+ receiveSerializer.addWork(createReceiverWork(buffer));
+ }
+ else
+ {
+ // Shutting down
+ buffer.release();
+ }
+ }
+
+ protected ReceiverWork createReceiverWork(IBuffer buffer)
+ {
+ return new ReceiverWork(buffer);
+ }
+
+ public short getBufferCapacity()
+ {
+ return channelMultiplexer.getBufferCapacity();
+ }
+
+ public IBuffer provideBuffer()
+ {
+ return channelMultiplexer.provideBuffer();
+ }
+
+ public void retainBuffer(IBuffer buffer)
+ {
+ channelMultiplexer.retainBuffer(buffer);
+ }
+
+ @Override
+ public String toString()
+ {
+ if (receiveHandler instanceof IProtocol)
+ {
+ IProtocol<?> protocol = (IProtocol<?>)receiveHandler;
+ return MessageFormat.format("Channel[{0}, {1}, {2}]", id, getLocation(), protocol.getType()); //$NON-NLS-1$
+ }
+
+ return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$
+ }
+
+ @Override
+ protected void doBeforeActivate() throws Exception
+ {
+ super.doBeforeActivate();
+ checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$
+ checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$
+ }
+
+ @Override
+ protected void doActivate() throws Exception
+ {
+ super.doActivate();
+ sendQueue = new SendQueue();
+ if (receiveExecutor == null)
+ {
+ receiveSerializer = new SynchronousWorkSerializer();
+ }
+ else
+ {
+ receiveSerializer = new ReceiveSerializer();
+ }
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ unregisterFromMultiplexer();
+ if (receiveSerializer != null)
+ {
+ receiveSerializer.dispose();
+ receiveSerializer = null;
+ }
+
+ if (sendQueue != null)
+ {
+ sendQueue.clear();
+ sendQueue = null;
+ }
+
+ super.doDeactivate();
+ }
+
+ protected void unregisterFromMultiplexer()
+ {
+ channelMultiplexer.closeChannel(this);
+ }
+
+ public void close()
+ {
+ LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
+ }
+
+ public boolean isClosed()
+ {
+ return !isActive();
+ }
+
+ /**
+ * @author Eike Stepper
+ * @since 4.1
+ */
+ protected class ReceiveSerializer extends QueueWorkerWorkSerializer
+ {
+ // CompletionWorkSerializer throws "One command already pending"
+ // CompletionWorkSerializer
+ // AsynchronousWorkSerializer
+ // SynchronousWorkSerializer
+
+ @Override
+ protected String getThreadName()
+ {
+ return "ReceiveSerializer-" + Channel.this; //$NON-NLS-1$
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ protected class ReceiverWork implements Runnable
+ {
+ private final IBuffer buffer;
+
+ /**
+ * @since 3.0
+ */
+ public ReceiverWork(IBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public void run()
+ {
+ IBufferHandler receiveHandler = getReceiveHandler();
+ if (receiveHandler != null)
+ {
+ receiveHandler.handleBuffer(buffer);
+ }
+ else
+ {
+ // Shutting down
+ buffer.release();
+ }
+ }
+ }
+
+ /**
+ * A queue that holds buffers that shall be sent. This implementation notifies observers of enqueued and dequeued
+ * buffers. The notification's deliberately not synchronized. It shall only be used by O&M tooling to offer (not 100%
+ * accurate) statistical insights
+ *
+ * @author Eike Stepper
+ * @since 3.0
+ */
+ protected class SendQueue extends ConcurrentLinkedQueue<IBuffer>
+ {
+ private static final long serialVersionUID = 1L;
+
+ private AtomicInteger size = new AtomicInteger();
+
+ protected SendQueue()
+ {
+ }
+
+ @Override
+ public boolean add(IBuffer o)
+ {
+ super.add(o);
+ added();
+ return true;
+ }
+
+ @Override
+ public boolean offer(IBuffer o)
+ {
+ super.offer(o);
+ added();
+ return true;
+ }
+
+ @Override
+ public IBuffer poll()
+ {
+ IBuffer result = super.poll();
+ if (result != null)
+ {
+ removed();
+ }
+
+ return result;
+ }
+
+ @Override
+ public IBuffer remove()
+ {
+ IBuffer result = super.remove();
+ if (result != null)
+ {
+ removed();
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+ boolean result = super.remove(o);
+ if (result)
+ {
+ removed();
+ }
+
+ return result;
+ }
+
+ private void added()
+ {
+ int queueSize = size.incrementAndGet();
+ IListener[] listeners = getListeners();
+ if (listeners != null)
+ {
+ fireEvent(new SendQueueEventImpl(Type.ENQUEUED, queueSize), listeners);
+ }
+ }
+
+ private void removed()
+ {
+ int queueSize = size.decrementAndGet();
+ IListener[] listeners = getListeners();
+ if (listeners != null)
+ {
+ fireEvent(new SendQueueEventImpl(Type.DEQUEUED, queueSize), listeners);
+ }
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class SendQueueEventImpl extends Event implements SendQueueEvent
+ {
+ private static final long serialVersionUID = 1L;
+
+ private Type type;
+
+ private final int queueSize;
+
+ private SendQueueEventImpl(Type type, int queueSize)
+ {
+ super(Channel.this);
+ this.type = type;
+ this.queueSize = queueSize;
+ }
+
+ @Override
+ public InternalChannel getSource()
+ {
+ return (InternalChannel)super.getSource();
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ public int getQueueSize()
+ {
+ return queueSize;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
index 7187174de4..b3f0341f55 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
@@ -1,373 +1,372 @@
-/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.spi.net4j;
-
-import org.eclipse.net4j.ITransportConfig;
-import org.eclipse.net4j.Net4jUtil;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.channel.ChannelException;
-import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.protocol.IProtocolProvider;
-import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
-import org.eclipse.net4j.util.StringUtil;
-import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
-import org.eclipse.net4j.util.container.Container;
-import org.eclipse.net4j.util.factory.FactoryKey;
-import org.eclipse.net4j.util.factory.IFactoryKey;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.security.INegotiationContext;
-
-import org.eclipse.internal.net4j.TransportConfig;
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
-
- private ITransportConfig config;
-
- private long openChannelTimeout = IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT;
-
- @ExcludeFromDump
- private transient ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>();
-
- @ExcludeFromDump
- private transient Set<Short> channelIDs = new HashSet<Short>();
-
- @ExcludeFromDump
- private transient int lastChannelID;
-
- public ChannelMultiplexer()
- {
- }
-
- public synchronized ITransportConfig getConfig()
- {
- if (config == null)
- {
- config = new TransportConfig(this);
- }
-
- return config;
- }
-
- public synchronized void setConfig(ITransportConfig config)
- {
- checkInactive();
- this.config = Net4jUtil.copyTransportConfig(this, config);
- }
-
- public long getOpenChannelTimeout()
- {
- if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT)
- {
- return OM.BUNDLE.getDebugSupport().getDebugOption("open.channel.timeout", 10000); //$NON-NLS-1$
- }
-
- return openChannelTimeout;
- }
-
- public void setOpenChannelTimeout(long openChannelTimeout)
- {
- this.openChannelTimeout = openChannelTimeout;
- }
-
- public final InternalChannel getChannel(short channelID)
- {
- return (InternalChannel)channels.get(channelID);
- }
-
- public final Collection<IChannel> getChannels()
- {
- return channels.values();
- }
-
- @Override
- public boolean isEmpty()
- {
- return channels.isEmpty();
- }
-
- public IChannel[] getElements()
- {
- List<IChannel> list = new ArrayList<IChannel>(getChannels());
- return list.toArray(new IChannel[list.size()]);
- }
-
- public InternalChannel openChannel() throws ChannelException
- {
- return openChannel((IProtocol<?>)null);
- }
-
- public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException
- {
- IProtocol<?> protocol = createProtocol(protocolID, infraStructure);
- if (protocol == null)
- {
- throw new IllegalArgumentException("Unknown protocolID: " + protocolID); //$NON-NLS-1$
- }
-
- return openChannel(protocol);
- }
-
- public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException
- {
- long start = System.currentTimeMillis();
- doBeforeOpenChannel(protocol);
-
- InternalChannel channel = createChannel();
- initChannel(channel, protocol);
- channel.setID(getNextChannelID());
- addChannel(channel);
-
- try
- {
- try
- {
- long timeout = getOpenChannelTimeout() - System.currentTimeMillis() + start;
- if (timeout <= 0)
- {
- throw new TimeoutRuntimeException();
- }
-
- registerChannelWithPeer(channel.getID(), timeout, protocol);
- }
- catch (TimeoutRuntimeException ex)
- {
- // Adjust the message for the complete timeout time
- String message = "Channel registration timeout after " + getOpenChannelTimeout() + " milliseconds";
- throw new TimeoutRuntimeException(message, ex);
- }
- }
- catch (ChannelException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ChannelException(ex);
- }
-
- return channel;
- }
-
- public InternalChannel inverseOpenChannel(short channelID, String protocolID)
- {
- IProtocol<?> protocol = createProtocol(protocolID, null);
-
- InternalChannel channel = createChannel();
- initChannel(channel, protocol);
- channel.setID(channelID);
- addChannel(channel);
- return channel;
- }
-
- public void closeChannel(InternalChannel channel) throws ChannelException
- {
- InternalChannel internalChannel = channel;
- deregisterChannelFromPeer(internalChannel);
- removeChannel(internalChannel);
- }
-
- public void inverseCloseChannel(short channelID) throws ChannelException
- {
- InternalChannel channel = getChannel(channelID);
- LifecycleUtil.deactivate(channel);
- }
-
- protected InternalChannel createChannel()
- {
- return new Channel();
- }
-
- protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
- {
- channel.setMultiplexer(this);
- channel.setReceiveExecutor(getConfig().getReceiveExecutor());
- if (protocol != null)
- {
- protocol.setChannel(channel);
- LifecycleUtil.activate(protocol);
- if (TRACER.isEnabled())
- {
- String protocolType = protocol.getType();
- TRACER.format("Opening channel with protocol {0}", protocolType); //$NON-NLS-1$
- }
-
- channel.setReceiveHandler(protocol);
- }
- else
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Opening channel without protocol"); //$NON-NLS-1$
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure)
- {
- if (StringUtil.isEmpty(type))
- {
- return null;
- }
-
- IProtocolProvider protocolProvider = getConfig().getProtocolProvider();
- if (protocolProvider == null)
- {
- throw new ChannelException("No protocol provider configured"); //$NON-NLS-1$
- }
-
- IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type);
- if (protocol == null)
- {
- throw new ChannelException("Invalid protocol factory: " + type); //$NON-NLS-1$
- }
-
- if (infraStructure != null)
- {
- protocol.setInfraStructure(infraStructure);
- }
-
- return protocol;
- }
-
- protected IFactoryKey createProtocolFactoryKey(String type)
- {
- switch (getLocation())
- {
- case SERVER:
- return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type);
- case CLIENT:
- return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type);
- default:
- throw new IllegalStateException();
- }
- }
-
- protected void doBeforeOpenChannel(IProtocol<?> protocol)
- {
- // Do nothing
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- IChannel[] channels;
- synchronized (channelIDs)
- {
- channels = getElements();
- }
-
- for (IChannel channel : channels)
- {
- LifecycleUtil.deactivate(channel);
- }
-
- synchronized (channelIDs)
- {
- this.channels.clear();
- }
-
- super.doDeactivate();
- }
-
- protected abstract INegotiationContext createNegotiationContext();
-
- protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol)
- throws ChannelException;
-
- protected abstract void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException;
-
- private short getNextChannelID()
- {
- synchronized (channelIDs)
- {
- int start = lastChannelID;
- int maxValue = Short.MAX_VALUE;
- for (;;)
- {
- ++lastChannelID;
- if (lastChannelID == start)
- {
- throw new ChannelException("Too many channels"); //$NON-NLS-1$
- }
-
- if (lastChannelID > maxValue)
- {
- lastChannelID = 1;
- }
-
- short id = (short)(isClient() ? lastChannelID : -lastChannelID);
- if (channelIDs.add(id))
- {
- return id;
- }
- }
- }
- }
-
- private void addChannel(InternalChannel channel)
- {
- short channelID = channel.getID();
- if (channelID == RESERVED_CHANNEL || channelID == IBuffer.NO_CHANNEL)
- {
- throw new ChannelException("Invalid channel ID: " + channelID); //$NON-NLS-1$
- }
-
- channels.put(channelID, channel);
- LifecycleUtil.activate(channel);
- fireElementAddedEvent(channel);
- }
-
- private void removeChannel(InternalChannel channel)
- {
- try
- {
- short channelID = channel.getID();
- boolean removed;
- synchronized (channelIDs)
- {
- removed = channels.remove(channelID) != null;
- if (removed)
- {
- channelIDs.remove(channelID);
- }
- }
-
- if (removed)
- {
- fireElementRemovedEvent(channel);
- }
- }
- catch (RuntimeException ex)
- {
- OM.LOG.error(ex);
- throw ex;
- }
- }
-}
+/*
+ * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.spi.net4j;
+
+import org.eclipse.net4j.ITransportConfig;
+import org.eclipse.net4j.Net4jUtil;
+import org.eclipse.net4j.buffer.IBuffer;
+import org.eclipse.net4j.channel.ChannelException;
+import org.eclipse.net4j.channel.IChannel;
+import org.eclipse.net4j.channel.IChannelMultiplexer;
+import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.protocol.IProtocolProvider;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
+import org.eclipse.net4j.util.container.Container;
+import org.eclipse.net4j.util.factory.FactoryKey;
+import org.eclipse.net4j.util.factory.IFactoryKey;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.security.INegotiationContext;
+
+import org.eclipse.internal.net4j.TransportConfig;
+import org.eclipse.internal.net4j.bundle.OM;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
+
+ private ITransportConfig config;
+
+ private long openChannelTimeout = IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT;
+
+ private ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>();
+
+ @ExcludeFromDump
+ private transient Set<Short> channelIDs = new HashSet<Short>();
+
+ @ExcludeFromDump
+ private transient int lastChannelID;
+
+ public ChannelMultiplexer()
+ {
+ }
+
+ public synchronized ITransportConfig getConfig()
+ {
+ if (config == null)
+ {
+ config = new TransportConfig(this);
+ }
+
+ return config;
+ }
+
+ public synchronized void setConfig(ITransportConfig config)
+ {
+ checkInactive();
+ this.config = Net4jUtil.copyTransportConfig(this, config);
+ }
+
+ public long getOpenChannelTimeout()
+ {
+ if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT)
+ {
+ return OM.BUNDLE.getDebugSupport().getDebugOption("open.channel.timeout", 10000); //$NON-NLS-1$
+ }
+
+ return openChannelTimeout;
+ }
+
+ public void setOpenChannelTimeout(long openChannelTimeout)
+ {
+ this.openChannelTimeout = openChannelTimeout;
+ }
+
+ public final InternalChannel getChannel(short channelID)
+ {
+ return (InternalChannel)channels.get(channelID);
+ }
+
+ public final Collection<IChannel> getChannels()
+ {
+ return channels.values();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return channels.isEmpty();
+ }
+
+ public IChannel[] getElements()
+ {
+ List<IChannel> list = new ArrayList<IChannel>(getChannels());
+ return list.toArray(new IChannel[list.size()]);
+ }
+
+ public InternalChannel openChannel() throws ChannelException
+ {
+ return openChannel((IProtocol<?>)null);
+ }
+
+ public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException
+ {
+ IProtocol<?> protocol = createProtocol(protocolID, infraStructure);
+ if (protocol == null)
+ {
+ throw new IllegalArgumentException("Unknown protocolID: " + protocolID); //$NON-NLS-1$
+ }
+
+ return openChannel(protocol);
+ }
+
+ public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException
+ {
+ long start = System.currentTimeMillis();
+ doBeforeOpenChannel(protocol);
+
+ InternalChannel channel = createChannel();
+ initChannel(channel, protocol);
+ channel.setID(getNextChannelID());
+ addChannel(channel);
+
+ try
+ {
+ try
+ {
+ long timeout = getOpenChannelTimeout() - System.currentTimeMillis() + start;
+ if (timeout <= 0)
+ {
+ throw new TimeoutRuntimeException();
+ }
+
+ registerChannelWithPeer(channel.getID(), timeout, protocol);
+ }
+ catch (TimeoutRuntimeException ex)
+ {
+ // Adjust the message for the complete timeout time
+ String message = "Channel registration timeout after " + getOpenChannelTimeout() + " milliseconds";
+ throw new TimeoutRuntimeException(message, ex);
+ }
+ }
+ catch (ChannelException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ChannelException(ex);
+ }
+
+ return channel;
+ }
+
+ public InternalChannel inverseOpenChannel(short channelID, String protocolID)
+ {
+ IProtocol<?> protocol = createProtocol(protocolID, null);
+
+ InternalChannel channel = createChannel();
+ initChannel(channel, protocol);
+ channel.setID(channelID);
+ addChannel(channel);
+ return channel;
+ }
+
+ public void closeChannel(InternalChannel channel) throws ChannelException
+ {
+ InternalChannel internalChannel = channel;
+ deregisterChannelFromPeer(internalChannel);
+ removeChannel(internalChannel);
+ }
+
+ public void inverseCloseChannel(short channelID) throws ChannelException
+ {
+ InternalChannel channel = getChannel(channelID);
+ LifecycleUtil.deactivate(channel);
+ }
+
+ protected InternalChannel createChannel()
+ {
+ return new Channel();
+ }
+
+ protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
+ {
+ channel.setMultiplexer(this);
+ channel.setReceiveExecutor(getConfig().getReceiveExecutor());
+ if (protocol != null)
+ {
+ protocol.setChannel(channel);
+ LifecycleUtil.activate(protocol);
+ if (TRACER.isEnabled())
+ {
+ String protocolType = protocol.getType();
+ TRACER.format("Opening channel with protocol {0}", protocolType); //$NON-NLS-1$
+ }
+
+ channel.setReceiveHandler(protocol);
+ }
+ else
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Opening channel without protocol"); //$NON-NLS-1$
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure)
+ {
+ if (StringUtil.isEmpty(type))
+ {
+ return null;
+ }
+
+ IProtocolProvider protocolProvider = getConfig().getProtocolProvider();
+ if (protocolProvider == null)
+ {
+ throw new ChannelException("No protocol provider configured"); //$NON-NLS-1$
+ }
+
+ IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type);
+ if (protocol == null)
+ {
+ throw new ChannelException("Invalid protocol factory: " + type); //$NON-NLS-1$
+ }
+
+ if (infraStructure != null)
+ {
+ protocol.setInfraStructure(infraStructure);
+ }
+
+ return protocol;
+ }
+
+ protected IFactoryKey createProtocolFactoryKey(String type)
+ {
+ switch (getLocation())
+ {
+ case SERVER:
+ return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type);
+ case CLIENT:
+ return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ protected void doBeforeOpenChannel(IProtocol<?> protocol)
+ {
+ // Do nothing
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ IChannel[] channels;
+ synchronized (channelIDs)
+ {
+ channels = getElements();
+ }
+
+ for (IChannel channel : channels)
+ {
+ LifecycleUtil.deactivate(channel);
+ }
+
+ synchronized (channelIDs)
+ {
+ this.channels.clear();
+ }
+
+ super.doDeactivate();
+ }
+
+ protected abstract INegotiationContext createNegotiationContext();
+
+ protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol)
+ throws ChannelException;
+
+ protected abstract void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException;
+
+ private short getNextChannelID()
+ {
+ synchronized (channelIDs)
+ {
+ int start = lastChannelID;
+ int maxValue = Short.MAX_VALUE;
+ for (;;)
+ {
+ ++lastChannelID;
+ if (lastChannelID == start)
+ {
+ throw new ChannelException("Too many channels"); //$NON-NLS-1$
+ }
+
+ if (lastChannelID > maxValue)
+ {
+ lastChannelID = 1;
+ }
+
+ short id = (short)(isClient() ? lastChannelID : -lastChannelID);
+ if (channelIDs.add(id))
+ {
+ return id;
+ }
+ }
+ }
+ }
+
+ private void addChannel(InternalChannel channel)
+ {
+ short channelID = channel.getID();
+ if (channelID == RESERVED_CHANNEL || channelID == IBuffer.NO_CHANNEL)
+ {
+ throw new ChannelException("Invalid channel ID: " + channelID); //$NON-NLS-1$
+ }
+
+ channels.put(channelID, channel);
+ LifecycleUtil.activate(channel);
+ fireElementAddedEvent(channel);
+ }
+
+ private void removeChannel(InternalChannel channel)
+ {
+ try
+ {
+ short channelID = channel.getID();
+ boolean removed;
+ synchronized (channelIDs)
+ {
+ removed = channels.remove(channelID) != null;
+ if (removed)
+ {
+ channelIDs.remove(channelID);
+ }
+ }
+
+ if (removed)
+ {
+ fireElementRemovedEvent(channel);
+ }
+ }
+ catch (RuntimeException ex)
+ {
+ OM.LOG.error(ex);
+ throw ex;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java
index 424e580e2e..c33de7bdc9 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java
@@ -1,430 +1,433 @@
-/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.spi.net4j;
-
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.channel.ChannelException;
-import org.eclipse.net4j.connector.ConnectorException;
-import org.eclipse.net4j.connector.ConnectorState;
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.connector.IConnectorStateEvent;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
-import org.eclipse.net4j.util.event.Event;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.log.OMLogger;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.security.INegotiationContext;
-import org.eclipse.net4j.util.security.INegotiator;
-import org.eclipse.net4j.util.security.NegotiationException;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.text.MessageFormat;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public abstract class Connector extends ChannelMultiplexer implements InternalConnector
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class);
-
- private String userID;
-
- private transient ConnectorState connectorState = ConnectorState.DISCONNECTED;
-
- @ExcludeFromDump
- private transient CountDownLatch finishedConnecting;
-
- @ExcludeFromDump
- private transient CountDownLatch finishedNegotiating;
-
- @ExcludeFromDump
- private transient INegotiationContext negotiationContext;
-
- @ExcludeFromDump
- private transient NegotiationException negotiationException;
-
- public Connector()
- {
- }
-
- public INegotiator getNegotiator()
- {
- return getConfig().getNegotiator();
- }
-
- public void setNegotiator(INegotiator negotiator)
- {
- getConfig().setNegotiator(negotiator);
- }
-
- public INegotiationContext getNegotiationContext()
- {
- return negotiationContext;
- }
-
- public boolean isClient()
- {
- return getLocation() == Location.CLIENT;
- }
-
- public boolean isServer()
- {
- return getLocation() == Location.SERVER;
- }
-
- public String getUserID()
- {
- return userID;
- }
-
- public void setUserID(String userID)
- {
- checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected"); //$NON-NLS-1$
- if (TRACER.isEnabled())
- {
- TRACER.format("Setting userID {0} for {1}", userID, this); //$NON-NLS-1$
- }
-
- this.userID = userID;
- }
-
- public ConnectorState getState()
- {
- return connectorState;
- }
-
- public void setState(ConnectorState newState) throws ConnectorException
- {
- ConnectorState oldState = getState();
- if (newState != oldState)
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this); //$NON-NLS-1$
- }
-
- connectorState = newState;
- switch (newState)
- {
- case DISCONNECTED:
- if (finishedConnecting != null)
- {
- finishedConnecting.countDown();
- finishedConnecting = null;
- }
-
- if (finishedNegotiating != null)
- {
- finishedNegotiating.countDown();
- finishedNegotiating = null;
- }
-
- break;
-
- case CONNECTING:
- finishedConnecting = new CountDownLatch(1);
- finishedNegotiating = new CountDownLatch(1);
- // The concrete implementation must advance state to NEGOTIATING or CONNECTED
- break;
-
- case NEGOTIATING:
- finishedConnecting.countDown();
- negotiationContext = createNegotiationContext();
- getNegotiator().negotiate(negotiationContext);
- break;
-
- case CONNECTED:
- negotiationContext = null;
- deferredActivate(true);
- finishedConnecting.countDown();
- finishedNegotiating.countDown();
- break;
- }
-
- fireEvent(new ConnectorStateEvent(this, oldState, newState));
- }
- }
-
- public boolean isDisconnected()
- {
- return connectorState == ConnectorState.DISCONNECTED;
- }
-
- public boolean isConnecting()
- {
- return connectorState == ConnectorState.CONNECTING;
- }
-
- public boolean isNegotiating()
- {
- return connectorState == ConnectorState.NEGOTIATING;
- }
-
- public boolean isConnected()
- {
- if (negotiationException != null)
- {
- throw new ConnectorException("Connector negotiation failed", negotiationException); //$NON-NLS-1$
- }
-
- return connectorState == ConnectorState.CONNECTED;
- }
-
- public void connectAsync() throws ConnectorException
- {
- try
- {
- activate();
- }
- catch (ConnectorException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ConnectorException(ex);
- }
- }
-
- /**
- * @since 4.0
- */
- public void waitForConnection(long timeout) throws ConnectorException
- {
- String message = "Connection timeout after " + timeout + " milliseconds";
- final long MAX_POLL_INTERVAL = 100L;
- boolean withTimeout = timeout != NO_TIMEOUT;
-
- try
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Waiting for connection..."); //$NON-NLS-1$
- }
-
- for (;;)
- {
- long t = MAX_POLL_INTERVAL;
- if (withTimeout)
- {
- t = Math.min(MAX_POLL_INTERVAL, timeout);
- timeout -= MAX_POLL_INTERVAL;
- }
-
- if (t <= 0)
- {
- break;
- }
-
- if (finishedNegotiating == null)
- {
- break;
- }
-
- if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS))
- {
- break;
- }
- }
-
- if (!isConnected())
- {
- throw new ConnectorException(message);
- }
- }
- catch (ConnectorException ex)
- {
- setState(ConnectorState.DISCONNECTED);
- throw ex;
- }
- catch (Exception ex)
- {
- setState(ConnectorState.DISCONNECTED);
- throw new ConnectorException(ex);
- }
- }
-
- /**
- * @since 4.0
- */
- public void connect(long timeout) throws ConnectorException
- {
- connectAsync();
- waitForConnection(timeout);
- }
-
- /**
- * @since 4.0
- */
- public void connect() throws ConnectorException
- {
- connect(NO_TIMEOUT);
- }
-
- public void close()
- {
- LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
- }
-
- public boolean isClosed()
- {
- return !isActive();
- }
-
- public short getBufferCapacity()
- {
- return getConfig().getBufferProvider().getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- return getConfig().getBufferProvider().provideBuffer();
- }
-
- public void retainBuffer(IBuffer buffer)
- {
- getConfig().getBufferProvider().retainBuffer(buffer);
- }
-
- protected void leaveConnecting()
- {
- if (getNegotiator() == null)
- {
- setState(ConnectorState.CONNECTED);
- }
- else
- {
- setState(ConnectorState.NEGOTIATING);
- }
- }
-
- @Override
- protected abstract INegotiationContext createNegotiationContext();
-
- protected NegotiationException getNegotiationException()
- {
- return negotiationException;
- }
-
- protected void setNegotiationException(NegotiationException negotiationException)
- {
- this.negotiationException = negotiationException;
- }
-
- @Override
- protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
- {
- super.initChannel(channel, protocol);
- channel.setUserID(getUserID());
- }
-
- @Override
- protected void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException
- {
- }
-
- public Location getLocation()
- {
- return null;
- }
-
- public String getURL()
- {
- return null;
- }
-
- @Override
- protected boolean isDeferredActivation()
- {
- return true;
- }
-
- @Override
- protected void doBeforeOpenChannel(IProtocol<?> protocol)
- {
- super.doBeforeOpenChannel(protocol);
- long timeout = getOpenChannelTimeout();
- waitForConnection(timeout);
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(getConfig().getBufferProvider(), "getConfig().getBufferProvider()"); //$NON-NLS-1$
-
- if (userID != null && getConfig().getNegotiator() == null)
- {
- throw new IllegalStateException("A user ID on this connector requires a negotiator"); //$NON-NLS-1$
- }
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
- setState(ConnectorState.CONNECTING);
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- setState(ConnectorState.DISCONNECTED);
- super.doDeactivate();
- }
-
- /**
- * @author Eike Stepper
- */
- private static class ConnectorStateEvent extends Event implements IConnectorStateEvent
- {
- private static final long serialVersionUID = 1L;
-
- private ConnectorState oldState;
-
- private ConnectorState newState;
-
- public ConnectorStateEvent(IConnector source, ConnectorState oldState, ConnectorState newState)
- {
- super(source);
- this.oldState = oldState;
- this.newState = newState;
- }
-
- @Override
- public IConnector getSource()
- {
- return (IConnector)super.getSource();
- }
-
- public ConnectorState getOldState()
- {
- return oldState;
- }
-
- public ConnectorState getNewState()
- {
- return newState;
- }
-
- @Override
- public String toString()
- {
- return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(), //$NON-NLS-1$
- getOldState(), getNewState());
- }
- }
-}
+/*
+ * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.spi.net4j;
+
+import org.eclipse.net4j.buffer.IBuffer;
+import org.eclipse.net4j.channel.ChannelException;
+import org.eclipse.net4j.connector.ConnectorException;
+import org.eclipse.net4j.connector.ConnectorState;
+import org.eclipse.net4j.connector.IConnector;
+import org.eclipse.net4j.connector.IConnectorStateEvent;
+import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.event.Event;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.log.OMLogger;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.security.INegotiationContext;
+import org.eclipse.net4j.util.security.INegotiator;
+import org.eclipse.net4j.util.security.NegotiationException;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+import java.text.MessageFormat;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public abstract class Connector extends ChannelMultiplexer implements InternalConnector
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class);
+
+ private String userID;
+
+ private transient ConnectorState connectorState = ConnectorState.DISCONNECTED;
+
+ @ExcludeFromDump
+ private transient CountDownLatch finishedConnecting;
+
+ @ExcludeFromDump
+ private transient CountDownLatch finishedNegotiating;
+
+ @ExcludeFromDump
+ private transient INegotiationContext negotiationContext;
+
+ @ExcludeFromDump
+ private transient NegotiationException negotiationException;
+
+ public Connector()
+ {
+ }
+
+ public INegotiator getNegotiator()
+ {
+ return getConfig().getNegotiator();
+ }
+
+ public void setNegotiator(INegotiator negotiator)
+ {
+ getConfig().setNegotiator(negotiator);
+ }
+
+ public INegotiationContext getNegotiationContext()
+ {
+ return negotiationContext;
+ }
+
+ public boolean isClient()
+ {
+ return getLocation() == Location.CLIENT;
+ }
+
+ public boolean isServer()
+ {
+ return getLocation() == Location.SERVER;
+ }
+
+ public String getUserID()
+ {
+ return userID;
+ }
+
+ public void setUserID(String userID)
+ {
+ checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected"); //$NON-NLS-1$
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Setting userID {0} for {1}", userID, this); //$NON-NLS-1$
+ }
+
+ this.userID = userID;
+ }
+
+ public ConnectorState getState()
+ {
+ return connectorState;
+ }
+
+ public void setState(ConnectorState newState) throws ConnectorException
+ {
+ ConnectorState oldState = getState();
+ if (newState != oldState)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this); //$NON-NLS-1$
+ }
+
+ connectorState = newState;
+ switch (newState)
+ {
+ case DISCONNECTED:
+ if (finishedConnecting != null)
+ {
+ finishedConnecting.countDown();
+ finishedConnecting = null;
+ }
+
+ if (finishedNegotiating != null)
+ {
+ finishedNegotiating.countDown();
+ finishedNegotiating = null;
+ }
+
+ break;
+
+ case CONNECTING:
+ finishedConnecting = new CountDownLatch(1);
+ finishedNegotiating = new CountDownLatch(1);
+ // The concrete implementation must advance state to NEGOTIATING or CONNECTED
+ break;
+
+ case NEGOTIATING:
+ finishedConnecting.countDown();
+ negotiationContext = createNegotiationContext();
+ getNegotiator().negotiate(negotiationContext);
+ break;
+
+ case CONNECTED:
+ negotiationContext = null;
+ deferredActivate(true);
+ finishedConnecting.countDown();
+ finishedNegotiating.countDown();
+ break;
+ }
+
+ fireEvent(new ConnectorStateEvent(this, oldState, newState));
+ }
+ }
+
+ public boolean isDisconnected()
+ {
+ return connectorState == ConnectorState.DISCONNECTED;
+ }
+
+ public boolean isConnecting()
+ {
+ return connectorState == ConnectorState.CONNECTING;
+ }
+
+ public boolean isNegotiating()
+ {
+ return connectorState == ConnectorState.NEGOTIATING;
+ }
+
+ public boolean isConnected()
+ {
+ if (negotiationException != null)
+ {
+ throw new ConnectorException("Connector negotiation failed", negotiationException); //$NON-NLS-1$
+ }
+
+ return connectorState == ConnectorState.CONNECTED;
+ }
+
+ public void connectAsync() throws ConnectorException
+ {
+ try
+ {
+ activate();
+ }
+ catch (ConnectorException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ConnectorException(ex);
+ }
+ }
+
+ /**
+ * @since 4.0
+ */
+ public void waitForConnection(long timeout) throws ConnectorException
+ {
+ String message = "Connection timeout after " + timeout + " milliseconds";
+ final long MAX_POLL_INTERVAL = 100L;
+ boolean withTimeout = timeout != NO_TIMEOUT;
+
+ try
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Waiting for connection..."); //$NON-NLS-1$
+ }
+
+ for (;;)
+ {
+ long t = MAX_POLL_INTERVAL;
+ if (withTimeout)
+ {
+ t = Math.min(MAX_POLL_INTERVAL, timeout);
+ timeout -= MAX_POLL_INTERVAL;
+ }
+
+ if (t <= 0)
+ {
+ break;
+ }
+
+ if (finishedNegotiating == null)
+ {
+ break;
+ }
+
+ if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS))
+ {
+ break;
+ }
+ }
+
+ if (!isConnected())
+ {
+ throw new ConnectorException(message);
+ }
+ }
+ catch (ConnectorException ex)
+ {
+ setState(ConnectorState.DISCONNECTED);
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ setState(ConnectorState.DISCONNECTED);
+ throw new ConnectorException(ex);
+ }
+ }
+
+ /**
+ * @since 4.0
+ */
+ public void connect(long timeout) throws ConnectorException
+ {
+ connectAsync();
+ waitForConnection(timeout);
+ }
+
+ /**
+ * @since 4.0
+ */
+ public void connect() throws ConnectorException
+ {
+ connect(NO_TIMEOUT);
+ }
+
+ public void close()
+ {
+ LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
+ }
+
+ public boolean isClosed()
+ {
+ return !isActive();
+ }
+
+ public short getBufferCapacity()
+ {
+ return getConfig().getBufferProvider().getBufferCapacity();
+ }
+
+ public IBuffer provideBuffer()
+ {
+ return getConfig().getBufferProvider().provideBuffer();
+ }
+
+ public void retainBuffer(IBuffer buffer)
+ {
+ getConfig().getBufferProvider().retainBuffer(buffer);
+ }
+
+ protected void leaveConnecting()
+ {
+ if (getNegotiator() == null)
+ {
+ setState(ConnectorState.CONNECTED);
+ }
+ else
+ {
+ setState(ConnectorState.NEGOTIATING);
+ }
+ }
+
+ @Override
+ protected abstract INegotiationContext createNegotiationContext();
+
+ protected NegotiationException getNegotiationException()
+ {
+ return negotiationException;
+ }
+
+ protected void setNegotiationException(NegotiationException negotiationException)
+ {
+ this.negotiationException = negotiationException;
+ }
+
+ @Override
+ protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
+ {
+ super.initChannel(channel, protocol);
+ channel.setUserID(getUserID());
+ }
+
+ @Override
+ protected void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException
+ {
+ }
+
+ public Location getLocation()
+ {
+ return null;
+ }
+
+ public String getURL()
+ {
+ return null;
+ }
+
+ /**
+ * @since 4.1
+ */
+ @Override
+ public boolean isDeferredActivation()
+ {
+ return true;
+ }
+
+ @Override
+ protected void doBeforeOpenChannel(IProtocol<?> protocol)
+ {
+ super.doBeforeOpenChannel(protocol);
+ long timeout = getOpenChannelTimeout();
+ waitForConnection(timeout);
+ }
+
+ @Override
+ protected void doBeforeActivate() throws Exception
+ {
+ super.doBeforeActivate();
+ checkState(getConfig().getBufferProvider(), "getConfig().getBufferProvider()"); //$NON-NLS-1$
+
+ if (userID != null && getConfig().getNegotiator() == null)
+ {
+ throw new IllegalStateException("A user ID on this connector requires a negotiator"); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ protected void doActivate() throws Exception
+ {
+ super.doActivate();
+ setState(ConnectorState.CONNECTING);
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ setState(ConnectorState.DISCONNECTED);
+ super.doDeactivate();
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static class ConnectorStateEvent extends Event implements IConnectorStateEvent
+ {
+ private static final long serialVersionUID = 1L;
+
+ private ConnectorState oldState;
+
+ private ConnectorState newState;
+
+ public ConnectorStateEvent(IConnector source, ConnectorState oldState, ConnectorState newState)
+ {
+ super(source);
+ this.oldState = oldState;
+ this.newState = newState;
+ }
+
+ @Override
+ public IConnector getSource()
+ {
+ return (IConnector)super.getSource();
+ }
+
+ public ConnectorState getOldState()
+ {
+ return oldState;
+ }
+
+ public ConnectorState getNewState()
+ {
+ return newState;
+ }
+
+ @Override
+ public String toString()
+ {
+ return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(), //$NON-NLS-1$
+ getOldState(), getNewState());
+ }
+ }
+}

Back to the top