Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2008-11-12 17:42:21 +0000
committerEike Stepper2008-11-12 17:42:21 +0000
commit87ffc1e4b3038e67dc2759167f71e81d1103874d (patch)
tree6c0af40cb4f3a2b882a855e7c75725e2093760fb /plugins/org.eclipse.net4j
parent9740b5da2821adfa962f89ac7d204c8da1e7e5fc (diff)
downloadcdo-87ffc1e4b3038e67dc2759167f71e81d1103874d.tar.gz
cdo-87ffc1e4b3038e67dc2759167f71e81d1103874d.tar.xz
cdo-87ffc1e4b3038e67dc2759167f71e81d1103874d.zip
[251751] Provide progress monitoring for commit operations
https://bugs.eclipse.org/bugs/show_bug.cgi?id=251751
Diffstat (limited to 'plugins/org.eclipse.net4j')
-rw-r--r--plugins/org.eclipse.net4j/META-INF/MANIFEST.MF24
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java266
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java778
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java24
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java59
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java29
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelException.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalRemoteException.java)15
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java12
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java29
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java32
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java71
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java190
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java84
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java41
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java43
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java43
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java51
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java39
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java16
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java38
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java47
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java115
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java254
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java199
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java48
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java132
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java31
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java7
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java7
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Acceptor.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/acceptor/Acceptor.java)6
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java)54
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java360
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ClientProtocolFactory.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ClientProtocolFactory.java)3
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java402
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java12
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/Protocol.java)104
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ServerProtocolFactory.java (renamed from plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ServerProtocolFactory.java)3
44 files changed, 2409 insertions, 1287 deletions
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
index e5c660af87..35b21c8595 100644
--- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
@@ -19,14 +19,6 @@ Export-Package: org.eclipse.internal.net4j;version="2.0.0";
org.eclipse.net4j.http.common,
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests",
- org.eclipse.internal.net4j.acceptor;version="2.0.0";
- x-friends:="org.eclipse.net4j.http.server,
- org.eclipse.net4j.jvm,
- org.eclipse.net4j.tcp,
- org.eclipse.net4j.http,
- org.eclipse.net4j.http.common,
- org.eclipse.net4j.http.tests,
- org.eclipse.net4j.tests",
org.eclipse.internal.net4j.buffer;version="2.0.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
@@ -36,22 +28,6 @@ Export-Package: org.eclipse.internal.net4j;version="2.0.0";
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests",
org.eclipse.internal.net4j.bundle;version="2.0.0";x-internal:=true,
- org.eclipse.internal.net4j.channel;version="2.0.0";
- x-friends:="org.eclipse.net4j.http,
- org.eclipse.net4j.http.common,
- org.eclipse.net4j.http.server,
- org.eclipse.net4j.tcp,
- org.eclipse.net4j.http.tests,
- org.eclipse.net4j.tests,
- org.eclipse.net4j.jvm",
- org.eclipse.internal.net4j.connector;version="2.0.0";
- x-friends:="org.eclipse.net4j.http,
- org.eclipse.net4j.http.common,
- org.eclipse.net4j.http.server,
- org.eclipse.net4j.jvm,
- org.eclipse.net4j.tcp,
- org.eclipse.net4j.http.tests,
- org.eclipse.net4j.tests",
org.eclipse.net4j;version="2.0.0",
org.eclipse.net4j.acceptor;version="2.0.0",
org.eclipse.net4j.buffer;version="2.0.0",
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
index 90effe2563..5e1a1f0f3e 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.BufferState;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.util.HexUtil;
+import org.eclipse.net4j.util.IErrorHandler;
import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.StringUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -36,9 +37,11 @@ public class Buffer implements InternalBuffer
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_BUFFER, Buffer.class);
+ private IErrorHandler errorHandler;
+
private IBufferProvider bufferProvider;
- private short channelIndex;
+ private short channelID;
private boolean eos;
@@ -72,14 +75,14 @@ public class Buffer implements InternalBuffer
this.bufferProvider = bufferProvider;
}
- public short getChannelIndex()
+ public short getChannelID()
{
if (state == BufferState.INITIAL || state == BufferState.READING_HEADER)
{
throw new IllegalStateException("state == " + state); //$NON-NLS-1$
}
- return channelIndex;
+ return channelID;
}
public short getCapacity()
@@ -107,6 +110,7 @@ public class Buffer implements InternalBuffer
*/
public void release()
{
+ errorHandler = null;
if (bufferProvider != null)
{
bufferProvider.retainBuffer(this);
@@ -117,7 +121,7 @@ public class Buffer implements InternalBuffer
{
byteBuffer.clear();
state = BufferState.INITIAL;
- channelIndex = NO_CHANNEL;
+ channelID = NO_CHANNEL;
eos = false;
}
@@ -129,21 +133,50 @@ public class Buffer implements InternalBuffer
public ByteBuffer startGetting(SocketChannel socketChannel) throws IOException
{
- if (state != BufferState.INITIAL && state != BufferState.READING_HEADER && state != BufferState.READING_BODY)
+ try
{
- throw new IllegalStateException("state == " + state); //$NON-NLS-1$
- }
+ if (state != BufferState.INITIAL && state != BufferState.READING_HEADER && state != BufferState.READING_BODY)
+ {
+ throw new IllegalStateException("state == " + state); //$NON-NLS-1$
+ }
- if (state == BufferState.INITIAL)
- {
- byteBuffer.limit(IBuffer.HEADER_SIZE);
- state = BufferState.READING_HEADER;
- }
+ if (state == BufferState.INITIAL)
+ {
+ byteBuffer.limit(IBuffer.HEADER_SIZE);
+ state = BufferState.READING_HEADER;
+ }
- if (state == BufferState.READING_HEADER)
- {
- int num = socketChannel.read(byteBuffer);
- if (num == -1)
+ if (state == BufferState.READING_HEADER)
+ {
+ int num = socketChannel.read(byteBuffer);
+ if (num == -1)
+ {
+ throw new ClosedChannelException();
+ }
+
+ if (byteBuffer.hasRemaining())
+ {
+ return null;
+ }
+
+ byteBuffer.flip();
+ channelID = byteBuffer.getShort();
+ short payloadSize = byteBuffer.getShort();
+ if (payloadSize < 0)
+ {
+ eos = true;
+ payloadSize = (short)-payloadSize;
+ }
+
+ payloadSize -= EOS_OFFSET;
+
+ byteBuffer.clear();
+ byteBuffer.limit(payloadSize);
+ state = BufferState.READING_BODY;
+ }
+
+ // state == State.READING_BODY
+ if (socketChannel.read(byteBuffer) == -1)
{
throw new ClosedChannelException();
}
@@ -153,67 +186,69 @@ public class Buffer implements InternalBuffer
return null;
}
- byteBuffer.flip();
- channelIndex = byteBuffer.getShort();
- short payloadSize = byteBuffer.getShort();
- if (payloadSize < 0)
+ if (TRACER.isEnabled())
{
- eos = true;
- payloadSize = (short)-payloadSize;
+ TRACER.trace("Read " + byteBuffer.limit() + " bytes" //$NON-NLS-1$ //$NON-NLS-2$
+ + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$
}
- payloadSize -= EOS_OFFSET;
-
- byteBuffer.clear();
- byteBuffer.limit(payloadSize);
- state = BufferState.READING_BODY;
+ byteBuffer.flip();
+ state = BufferState.GETTING;
+ return byteBuffer;
}
-
- // state == State.READING_BODY
- if (socketChannel.read(byteBuffer) == -1)
+ catch (IOException ex)
{
- throw new ClosedChannelException();
+ handleError(ex);
+ throw ex;
}
-
- if (byteBuffer.hasRemaining())
+ catch (RuntimeException ex)
{
- return null;
+ handleError(ex);
+ throw ex;
}
-
- if (TRACER.isEnabled())
+ catch (Error ex)
{
- TRACER.trace("Read " + byteBuffer.limit() + " bytes" //$NON-NLS-1$ //$NON-NLS-2$
- + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$
+ handleError(ex);
+ throw ex;
}
-
- byteBuffer.flip();
- state = BufferState.GETTING;
- return byteBuffer;
}
- public ByteBuffer startPutting(short channelIndex)
+ public ByteBuffer startPutting(short channelID)
{
- if (state == BufferState.PUTTING)
+ try
{
- if (channelIndex != this.channelIndex)
+ if (state == BufferState.PUTTING)
+ {
+ if (channelID != this.channelID)
+ {
+ throw new IllegalArgumentException("channelID != this.channelID"); //$NON-NLS-1$
+ }
+ }
+ else if (state != BufferState.INITIAL)
{
- throw new IllegalArgumentException("channelIndex != this.channelIndex"); //$NON-NLS-1$
+ throw new IllegalStateException("state: " + state); //$NON-NLS-1$
}
+ else
+ {
+ state = BufferState.PUTTING;
+ this.channelID = channelID;
+
+ byteBuffer.clear();
+ byteBuffer.position(IBuffer.HEADER_SIZE);
+ }
+
+ return byteBuffer;
}
- else if (state != BufferState.INITIAL)
+ catch (RuntimeException ex)
{
- throw new IllegalStateException("state: " + state); //$NON-NLS-1$
+ handleError(ex);
+ throw ex;
}
- else
+ catch (Error ex)
{
- state = BufferState.PUTTING;
- this.channelIndex = channelIndex;
-
- byteBuffer.clear();
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ handleError(ex);
+ throw ex;
}
-
- return byteBuffer;
}
/**
@@ -221,62 +256,93 @@ public class Buffer implements InternalBuffer
*/
public boolean write(SocketChannel socketChannel) throws IOException
{
- if (state != BufferState.PUTTING && state != BufferState.WRITING)
+ try
{
- throw new IllegalStateException("state == " + state); //$NON-NLS-1$
- }
+ if (state != BufferState.PUTTING && state != BufferState.WRITING)
+ {
+ throw new IllegalStateException("state == " + state); //$NON-NLS-1$
+ }
- if (state == BufferState.PUTTING)
- {
- if (channelIndex == NO_CHANNEL)
+ if (state == BufferState.PUTTING)
{
- throw new IllegalStateException("channelIndex == NO_CHANNEL"); //$NON-NLS-1$
+ if (channelID == NO_CHANNEL)
+ {
+ throw new IllegalStateException("channelID == NO_CHANNEL"); //$NON-NLS-1$
+ }
+
+ int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + EOS_OFFSET;
+ if (eos)
+ {
+ payloadSize = -payloadSize;
+ }
+
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Writing " + (Math.abs(payloadSize) - 1) + " bytes" //$NON-NLS-1$ //$NON-NLS-2$
+ + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+
+ byteBuffer.flip();
+ byteBuffer.putShort(channelID);
+ byteBuffer.putShort((short)payloadSize);
+ byteBuffer.position(0);
+ state = BufferState.WRITING;
}
- int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + EOS_OFFSET;
- if (eos)
+ int numBytes = socketChannel.write(byteBuffer);
+ if (numBytes == -1)
{
- payloadSize = -payloadSize;
+ throw new IOException("Channel closed"); //$NON-NLS-1$
}
- if (TRACER.isEnabled())
+ if (byteBuffer.hasRemaining())
{
- TRACER.trace("Writing " + (Math.abs(payloadSize) - 1) + " bytes" //$NON-NLS-1$ //$NON-NLS-2$
- + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$
+ return false;
}
- byteBuffer.flip();
- byteBuffer.putShort(channelIndex);
- byteBuffer.putShort((short)payloadSize);
- byteBuffer.position(0);
- state = BufferState.WRITING;
+ clear();
+ return true;
}
-
- int numBytes = socketChannel.write(byteBuffer);
- if (numBytes == -1)
+ catch (IOException ex)
{
- throw new IOException("Channel closed"); //$NON-NLS-1$
+ handleError(ex);
+ throw ex;
}
-
- if (byteBuffer.hasRemaining())
+ catch (RuntimeException ex)
{
- return false;
+ handleError(ex);
+ throw ex;
+ }
+ catch (Error ex)
+ {
+ handleError(ex);
+ throw ex;
}
-
- clear();
- return true;
}
public void flip()
{
- if (state != BufferState.PUTTING)
+ try
{
- throw new IllegalStateException("state == " + state); //$NON-NLS-1$
- }
+ if (state != BufferState.PUTTING)
+ {
+ throw new IllegalStateException("state == " + state); //$NON-NLS-1$
+ }
- byteBuffer.flip();
- byteBuffer.position(IBuffer.HEADER_SIZE);
- state = BufferState.GETTING;
+ byteBuffer.flip();
+ byteBuffer.position(IBuffer.HEADER_SIZE);
+ state = BufferState.GETTING;
+ }
+ catch (RuntimeException ex)
+ {
+ handleError(ex);
+ throw ex;
+ }
+ catch (Error ex)
+ {
+ handleError(ex);
+ throw ex;
+ }
}
@Override
@@ -319,4 +385,24 @@ public class Buffer implements InternalBuffer
byteBuffer.limit(oldLimit);
}
}
+
+ public IErrorHandler getErrorHandler()
+ {
+ return errorHandler;
+ }
+
+ public void setErrorHandler(IErrorHandler errorHandler)
+ {
+ this.errorHandler = errorHandler;
+ }
+
+ private void handleError(Throwable t)
+ {
+ OM.LOG.error(t);
+ if (errorHandler != null)
+ {
+ errorHandler.handleError(t);
+ release();
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
deleted file mode 100644
index ef929e6122..0000000000
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java
+++ /dev/null
@@ -1,778 +0,0 @@
-/***************************************************************************
- * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- **************************************************************************/
-package org.eclipse.internal.net4j.connector;
-
-import org.eclipse.net4j.ITransportConfig;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.connector.ConnectorException;
-import org.eclipse.net4j.connector.ConnectorState;
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.connector.IConnectorStateEvent;
-import org.eclipse.net4j.protocol.ClientProtocolFactory;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.protocol.IProtocolProvider;
-import org.eclipse.net4j.protocol.ServerProtocolFactory;
-import org.eclipse.net4j.util.StringUtil;
-import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
-import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
-import org.eclipse.net4j.util.container.Container;
-import org.eclipse.net4j.util.event.Event;
-import org.eclipse.net4j.util.event.INotifier;
-import org.eclipse.net4j.util.factory.FactoryKey;
-import org.eclipse.net4j.util.factory.IFactoryKey;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.monitor.MonitorUtil;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.security.INegotiationContext;
-import org.eclipse.net4j.util.security.INegotiator;
-import org.eclipse.net4j.util.security.NegotiationException;
-
-import org.eclipse.internal.net4j.TransportConfig;
-import org.eclipse.internal.net4j.bundle.OM;
-import org.eclipse.internal.net4j.channel.Channel;
-
-import org.eclipse.spi.net4j.InternalChannel;
-import org.eclipse.spi.net4j.InternalConnector;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Eike Stepper
- */
-public abstract class Connector extends Container<IChannel> implements InternalConnector
-{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class);
-
- private String userID;
-
- private ITransportConfig config;
-
- private long channelTimeout = IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT;
-
- private transient ConnectorState connectorState = ConnectorState.DISCONNECTED;
-
- private transient InternalChannel[] channels = {};
-
- @ExcludeFromDump
- private transient Object channelsLock = new Object();
-
- @ExcludeFromDump
- private transient CountDownLatch finishedConnecting;
-
- @ExcludeFromDump
- private transient CountDownLatch finishedNegotiating;
-
- @ExcludeFromDump
- private transient INegotiationContext negotiationContext;
-
- @ExcludeFromDump
- private transient NegotiationException negotiationException;
-
- public Connector()
- {
- }
-
- public synchronized ITransportConfig getConfig()
- {
- if (config == null)
- {
- config = new TransportConfig();
- }
-
- return config;
- }
-
- public synchronized void setConfig(ITransportConfig config)
- {
- checkInactive();
- this.config = config;
- }
-
- public INegotiator getNegotiator()
- {
- return getConfig().getNegotiator();
- }
-
- public void setNegotiator(INegotiator negotiator)
- {
- getConfig().setNegotiator(negotiator);
- }
-
- public INegotiationContext getNegotiationContext()
- {
- return negotiationContext;
- }
-
- public long getChannelTimeout()
- {
- if (channelTimeout == IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT)
- {
- return OM.BUNDLE.getDebugSupport().getDebugOption("channel.timeout", 10000);
- }
-
- return channelTimeout;
- }
-
- public void setChannelTimeout(long channelTimeout)
- {
- this.channelTimeout = channelTimeout;
- }
-
- public boolean isClient()
- {
- return getLocation() == Location.CLIENT;
- }
-
- public boolean isServer()
- {
- return getLocation() == Location.SERVER;
- }
-
- public String getUserID()
- {
- return userID;
- }
-
- public void setUserID(String userID)
- {
- checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected");
- if (TRACER.isEnabled())
- {
- TRACER.format("Setting userID {0} for {1}", userID, this);
- }
-
- this.userID = userID;
- }
-
- public ConnectorState getState()
- {
- return connectorState;
- }
-
- public void setState(ConnectorState newState) throws ConnectorException
- {
- ConnectorState oldState = getState();
- if (newState != oldState)
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this);
- }
-
- connectorState = newState;
- switch (newState)
- {
- case DISCONNECTED:
- if (finishedConnecting != null)
- {
- finishedConnecting.countDown();
- finishedConnecting = null;
- }
-
- if (finishedNegotiating != null)
- {
- finishedNegotiating.countDown();
- finishedNegotiating = null;
- }
- break;
-
- case CONNECTING:
- finishedConnecting = new CountDownLatch(1);
- finishedNegotiating = new CountDownLatch(1);
- // The concrete implementation must advance state to NEGOTIATING or CONNECTED
- break;
-
- case NEGOTIATING:
- finishedConnecting.countDown();
- negotiationContext = createNegotiationContext();
- getNegotiator().negotiate(negotiationContext);
- break;
-
- case CONNECTED:
- negotiationContext = null;
- deferredActivate();
- finishedConnecting.countDown();
- finishedNegotiating.countDown();
- break;
- }
-
- fireEvent(new ConnectorStateEvent(this, oldState, newState));
- }
- }
-
- public boolean isDisconnected()
- {
- return connectorState == ConnectorState.DISCONNECTED;
- }
-
- public boolean isConnecting()
- {
- return connectorState == ConnectorState.CONNECTING;
- }
-
- public boolean isNegotiating()
- {
- return connectorState == ConnectorState.NEGOTIATING;
- }
-
- public boolean isConnected()
- {
- if (negotiationException != null)
- {
- throw new ConnectorException("Connector negotiation failed", negotiationException);
- }
-
- return connectorState == ConnectorState.CONNECTED;
- }
-
- public void connectAsync() throws ConnectorException
- {
- try
- {
- activate();
- }
- catch (ConnectorException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ConnectorException(ex);
- }
- }
-
- public boolean waitForConnection(long timeout) throws ConnectorException
- {
- final long MAX_POLL_INTERVAL = 100L;
- boolean withTimeout = timeout != NO_TIMEOUT;
-
- try
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Waiting for connection...");
- }
-
- for (;;)
- {
- long t = MAX_POLL_INTERVAL;
- if (withTimeout)
- {
- t = Math.min(MAX_POLL_INTERVAL, timeout);
- timeout -= MAX_POLL_INTERVAL;
- }
-
- if (t <= 0)
- {
- break;
- }
-
- if (finishedNegotiating == null)
- {
- break;
- }
-
- if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS))
- {
- break;
- }
-
- if (MonitorUtil.isCanceled())
- {
- break;
- }
- }
-
- return isConnected();
- }
- catch (InterruptedException ex)
- {
- return false;
- }
- }
-
- public boolean connect(long timeout) throws ConnectorException
- {
- connectAsync();
- return waitForConnection(timeout);
- }
-
- public boolean connect() throws ConnectorException
- {
- return connect(NO_TIMEOUT);
- }
-
- public ConnectorException disconnect()
- {
- Exception ex = deactivate();
- if (ex == null)
- {
- return null;
- }
-
- if (ex instanceof ConnectorException)
- {
- return (ConnectorException)ex;
- }
-
- return new ConnectorException(ex);
- }
-
- public final List<IChannel> getChannels()
- {
- List<IChannel> result = new ArrayList<IChannel>(0);
- synchronized (channelsLock)
- {
- for (int i = 0; i < channels.length; i++)
- {
- IChannel channel = channels[i];
- if (LifecycleUtil.isActive(channel))
- {
- result.add(channel);
- }
- }
- }
-
- return result;
- }
-
- @Override
- public boolean isEmpty()
- {
- return getElements().length == 0;
- }
-
- public IChannel[] getElements()
- {
- List<IChannel> list = getChannels();
- return list.toArray(new IChannel[list.size()]);
- }
-
- public InternalChannel openChannel() throws ConnectorException
- {
- return openChannel((IProtocol<?>)null);
- }
-
- public InternalChannel openChannel(String protocolID, Object infraStructure) throws ConnectorException
- {
- IProtocol<?> protocol = createProtocol(protocolID, infraStructure);
- if (protocol == null)
- {
- throw new IllegalArgumentException("Unknown protocolID: " + protocolID);
- }
-
- return openChannel(protocol);
- }
-
- public InternalChannel openChannel(IProtocol<?> protocol) throws ConnectorException
- {
- long openChannelTimeout = getChannelTimeout();
- long start = System.currentTimeMillis();
- if (!waitForConnection(openChannelTimeout))
- {
- throw new ConnectorException("Connector not connected");
- }
-
- final long elapsed = System.currentTimeMillis() - start;
- InternalChannel channel = createChannel();
- initChannel(channel, protocol);
- addChannelWithoutIndex(channel);
-
- try
- {
- try
- {
- registerChannelWithPeer(channel.getIndex(), openChannelTimeout - elapsed, protocol);
- }
- catch (TimeoutRuntimeException ex)
- {
- // Adjust the message for the complete timeout time
- throw new TimeoutRuntimeException("Registration timeout after " + openChannelTimeout + " milliseconds");
- }
- }
- catch (ConnectorException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw new ConnectorException(ex);
- }
-
- return channel;
- }
-
- public InternalChannel inverseOpenChannel(short channelIndex, String protocolID)
- {
- IProtocol<?> protocol = createProtocol(protocolID, null);
-
- InternalChannel channel = createChannel();
- initChannel(channel, protocol);
- channel.setChannelIndex(channelIndex);
- addChannelWithIndex(channel);
- return channel;
- }
-
- public final InternalChannel getChannel(short channelIndex)
- {
- int index = getChannelsArrayIndex(channelIndex);
- synchronized (channelsLock)
- {
- if (channels == null || index >= channels.length)
- {
- return null;
- }
-
- return channels[index];
- }
- }
-
- protected InternalChannel createChannel()
- {
- return new Channel();
- }
-
- private void initChannel(InternalChannel channel, IProtocol<?> protocol)
- {
- channel.setMultiplexer(this);
- channel.setReceiveExecutor(getConfig().getReceiveExecutor());
- channel.setUserID(getUserID());
- if (protocol != null)
- {
- protocol.setChannel(channel);
- LifecycleUtil.activate(protocol);
- if (TRACER.isEnabled())
- {
- String protocolType = protocol == null ? null : protocol.getType();
- TRACER.format("Opening channel with protocol {0}", protocolType);
- }
-
- channel.setReceiveHandler(protocol);
- }
- else
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Opening channel without protocol");
- }
- }
- }
-
- private void addChannelWithIndex(InternalChannel channel)
- {
- short channelIndex = channel.getIndex();
- int index = getChannelsArrayIndex(channelIndex);
- synchronized (channelsLock)
- {
- if (index >= channels.length)
- {
- InternalChannel[] newChannels = new InternalChannel[index + 1];
- System.arraycopy(channels, 0, newChannels, 0, channels.length);
- channels = newChannels;
- }
-
- channels[index] = channel;
- }
-
- LifecycleUtil.activate(channel);
- fireElementAddedEvent(channel);
- }
-
- private void addChannelWithoutIndex(InternalChannel channel)
- {
- final short INCREMENT = (short)(isClient() ? 1 : -1);
- short channelIndex = INCREMENT;
- synchronized (channelsLock)
- {
- for (;;)
- {
- int index = getChannelsArrayIndex(channelIndex);
- if (index >= channels.length)
- {
- channel.setChannelIndex(channelIndex);
- addChannelWithIndex(channel);
- return;
- }
-
- if (channels[index] == null)
- {
- channel.setChannelIndex(channelIndex);
- channels[index] = channel;
-
- LifecycleUtil.activate(channel);
- fireElementAddedEvent(channel);
- return;
- }
-
- channelIndex += INCREMENT;
- }
- }
- }
-
- public void closeChannel(InternalChannel channel) throws ConnectorException
- {
- InternalChannel internalChannel = channel;
- deregisterChannelFromPeer(internalChannel, getChannelTimeout());
- removeChannel(internalChannel, false);
- }
-
- public void inverseCloseChannel(short channelIndex) throws ConnectorException
- {
- InternalChannel channel = getChannel(channelIndex);
- if (channel != null && channel.isActive())
- {
- removeChannel(channel, true);
- }
- }
-
- private void removeChannel(InternalChannel channel, boolean inverse)
- {
- try
- {
- short channelIndex = channel.getIndex();
- int index = getChannelsArrayIndex(channelIndex);
- synchronized (channelsLock)
- {
- if (index < channels.length)
- {
- if (channels[index] != channel)
- {
- throw new IllegalStateException("Wrong channel: " + channels[index]);
- }
-
- if (TRACER.isEnabled())
- {
- TRACER.trace("Removing " + channel);
- }
-
- if (index == channels.length - 1)
- {
- --index;
- while (index > 0 && channels[index] == null)
- {
- --index;
- }
-
- if (index == 0)
- {
- channels = new InternalChannel[0];
- }
- else
- {
- InternalChannel[] newChannels = new InternalChannel[index + 1];
- System.arraycopy(channels, 0, newChannels, 0, newChannels.length);
- channels = newChannels;
- }
- }
- else
- {
- channels[index] = null;
- }
- }
- }
-
- channel.finishDeactivate(inverse);
- }
- catch (RuntimeException ex)
- {
- OM.LOG.error(ex);
- throw ex;
- }
- }
-
- private int getChannelsArrayIndex(short channelIndex)
- {
- if (channelIndex < 0)
- {
- return ~channelIndex << 1;
- }
-
- return (channelIndex << 1) - 1;
- }
-
- public short getBufferCapacity()
- {
- return getConfig().getBufferProvider().getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- return getConfig().getBufferProvider().provideBuffer();
- }
-
- public void retainBuffer(IBuffer buffer)
- {
- getConfig().getBufferProvider().retainBuffer(buffer);
- }
-
- protected void leaveConnecting()
- {
- if (getNegotiator() == null)
- {
- setState(ConnectorState.CONNECTED);
- }
- else
- {
- setState(ConnectorState.NEGOTIATING);
- }
- }
-
- protected abstract INegotiationContext createNegotiationContext();
-
- protected NegotiationException getNegotiationException()
- {
- return negotiationException;
- }
-
- protected void setNegotiationException(NegotiationException negotiationException)
- {
- this.negotiationException = negotiationException;
- }
-
- @SuppressWarnings("unchecked")
- protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure)
- {
- if (StringUtil.isEmpty(type))
- {
- return null;
- }
-
- IProtocolProvider protocolProvider = getConfig().getProtocolProvider();
- if (protocolProvider == null)
- {
- throw new ConnectorException("No protocol provider configured");
- }
-
- IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type);
- if (protocol == null)
- {
- throw new ConnectorException("Invalid protocol factory: " + type);
- }
-
- protocol.setBufferProvider(getConfig().getBufferProvider());
- protocol.setExecutorService(getConfig().getReceiveExecutor());
- if (infraStructure != null)
- {
- protocol.setInfraStructure(infraStructure);
- }
-
- return protocol;
- }
-
- protected IFactoryKey createProtocolFactoryKey(String type)
- {
- switch (getLocation())
- {
- case SERVER:
- return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type);
- case CLIENT:
- return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type);
- default:
- throw new IllegalStateException();
- }
- }
-
- @Override
- protected boolean isDeferredActivation()
- {
- return true;
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- if (getConfig().getBufferProvider() == null)
- {
- throw new IllegalStateException("getConfig().getBufferProvider() == null");
- }
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
- setState(ConnectorState.CONNECTING);
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- setState(ConnectorState.DISCONNECTED);
- synchronized (channelsLock)
- {
- for (short i = 0; i < channels.length; i++)
- {
- InternalChannel channel = channels[i];
- if (channel != null)
- {
- LifecycleUtil.deactivate(channel);
- }
- }
-
- channels = new InternalChannel[0];
- }
-
- super.doDeactivate();
- }
-
- protected abstract void registerChannelWithPeer(short channelIndex, long timeout, IProtocol<?> protocol)
- throws ConnectorException;
-
- protected abstract void deregisterChannelFromPeer(InternalChannel channel, long timeout) throws ConnectorException;
-
- /**
- * @author Eike Stepper
- */
- private static class ConnectorStateEvent extends Event implements IConnectorStateEvent
- {
- private static final long serialVersionUID = 1L;
-
- private ConnectorState oldState;
-
- private ConnectorState newState;
-
- public ConnectorStateEvent(INotifier notifier, ConnectorState oldState, ConnectorState newState)
- {
- super(notifier);
- this.oldState = oldState;
- this.newState = newState;
- }
-
- public IConnector getConnector()
- {
- return (IConnector)getSource();
- }
-
- public ConnectorState getOldState()
- {
- return oldState;
- }
-
- public ConnectorState getNewState()
- {
- return newState;
- }
-
- @Override
- public String toString()
- {
- return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(),
- getOldState(), getNewState());
- }
- }
-}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java
index c5c855d7a6..74dc4ae96f 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java
@@ -10,13 +10,14 @@
**************************************************************************/
package org.eclipse.net4j;
-import org.eclipse.net4j.protocol.ClientProtocolFactory;
import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.protocol.IProtocolProvider;
-import org.eclipse.net4j.protocol.ServerProtocolFactory;
import org.eclipse.net4j.util.concurrent.NonBlockingLongCounter;
import org.eclipse.net4j.util.container.IManagedContainer;
+import org.eclipse.spi.net4j.ClientProtocolFactory;
+import org.eclipse.spi.net4j.ServerProtocolFactory;
+
/**
* @author Eike Stepper
* @since 2.0
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java
index 7af26c36ba..03ddadabd4 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java
@@ -16,10 +16,10 @@ import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.net4j.util.security.INegotiator;
import org.eclipse.net4j.util.security.NegotiatorFactory;
-import org.eclipse.internal.net4j.acceptor.Acceptor;
import org.eclipse.internal.net4j.bundle.OM;
import org.eclipse.core.runtime.CoreException;
+import org.eclipse.spi.net4j.Acceptor;
import org.eclipse.spi.net4j.AcceptorFactory;
import org.w3c.dom.Document;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java
index 9d47f78c0c..eccc93ce52 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java
@@ -13,7 +13,7 @@ package org.eclipse.net4j.acceptor;
import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.util.container.IContainer;
-import org.eclipse.internal.net4j.acceptor.Acceptor;
+import org.eclipse.spi.net4j.Acceptor;
/**
* Accepts incoming connection requests from {@link Location#CLIENT client} {@link IConnector connectors} and creates
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
index 28732ed7ca..8079445eb8 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
@@ -42,6 +42,8 @@ public class BufferInputStream extends InputStream implements IBufferHandler
private RuntimeException exception;
+ private long stopTimeMillis;
+
public BufferInputStream()
{
}
@@ -59,6 +61,14 @@ public class BufferInputStream extends InputStream implements IBufferHandler
/**
* @since 2.0
*/
+ public void restartTimeout()
+ {
+ stopTimeMillis = System.currentTimeMillis() + getMillisBeforeTimeout();
+ }
+
+ /**
+ * @since 2.0
+ */
public RuntimeException getException()
{
return exception;
@@ -129,11 +139,10 @@ public class BufferInputStream extends InputStream implements IBufferHandler
protected boolean ensureBuffer() throws IOException
{
final long check = getMillisInterruptCheck();
- final long timeout = getMillisBeforeTimeout();
try
{
- if (timeout == NO_TIMEOUT)
+ if (getMillisBeforeTimeout() == NO_TIMEOUT)
{
while (currentBuffer == null)
{
@@ -153,8 +162,7 @@ public class BufferInputStream extends InputStream implements IBufferHandler
}
else
{
- // TODO Consider something faster than currentTimeMillis(), maybe less accurate?
- final long stop = System.currentTimeMillis() + timeout;
+ restartTimeout();
while (currentBuffer == null)
{
if (exception != null)
@@ -168,7 +176,13 @@ public class BufferInputStream extends InputStream implements IBufferHandler
return false;
}
- final long remaining = stop - System.currentTimeMillis();
+ long remaining;
+ synchronized (this)
+ {
+ remaining = stopTimeMillis;
+ }
+
+ remaining -= System.currentTimeMillis();
if (remaining <= 0)
{
return false;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java
index c7710aea49..01203a8aea 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java
@@ -11,6 +11,9 @@
package org.eclipse.net4j.buffer;
import org.eclipse.net4j.util.HexUtil;
+import org.eclipse.net4j.util.IErrorHandler;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.io.IORuntimeException;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -29,15 +32,33 @@ public class BufferOutputStream extends OutputStream
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, BufferOutputStream.class);
- private IBufferHandler bufferHandler;
-
private IBufferProvider bufferProvider;
+ private IBufferHandler bufferHandler;
+
private IBuffer currentBuffer;
- private short channelIndex;
+ private short channelID;
+
+ private RuntimeException exception;
+
+ @ExcludeFromDump
+ private transient IErrorHandler writeErrorHandler = new IErrorHandler()
+ {
+ public void handleError(Throwable t)
+ {
+ if (t instanceof RuntimeException)
+ {
+ setException((RuntimeException)t);
+ }
+ else
+ {
+ setException(new IORuntimeException(t));
+ }
+ }
+ };
- public BufferOutputStream(IBufferHandler bufferHandler, IBufferProvider bufferProvider, short channelIndex)
+ public BufferOutputStream(IBufferHandler bufferHandler, IBufferProvider bufferProvider, short channelID)
{
if (bufferHandler == null)
{
@@ -51,12 +72,28 @@ public class BufferOutputStream extends OutputStream
this.bufferHandler = bufferHandler;
this.bufferProvider = bufferProvider;
- this.channelIndex = channelIndex;
+ this.channelID = channelID;
+ }
+
+ public BufferOutputStream(IBufferHandler bufferHandler, short channelID)
+ {
+ this(bufferHandler, extractBufferProvider(bufferHandler), channelID);
}
- public BufferOutputStream(IBufferHandler bufferHandler, short channelIndex)
+ /**
+ * @since 2.0
+ */
+ public RuntimeException getException()
{
- this(bufferHandler, extractBufferProvider(bufferHandler), channelIndex);
+ return exception;
+ }
+
+ /**
+ * @since 2.0
+ */
+ public void setException(RuntimeException exception)
+ {
+ this.exception = exception;
}
@SuppressWarnings("deprecation")
@@ -123,10 +160,16 @@ public class BufferOutputStream extends OutputStream
protected void ensureBuffer()
{
+ if (exception != null)
+ {
+ throw exception;
+ }
+
if (currentBuffer == null)
{
currentBuffer = bufferProvider.provideBuffer();
- currentBuffer.startPutting(channelIndex);
+ currentBuffer.setErrorHandler(writeErrorHandler);
+ currentBuffer.startPutting(channelID);
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java
index 5db9105c80..74682509e7 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java
@@ -35,8 +35,8 @@ public enum BufferState
* <p>
* A transition to {@link #PUTTING} can be triggered by calling {@link IBuffer#startPutting(short)} once. If the
* buffer is intended to be passed to an {@link org.eclipse.net4j.channel.IChannel IChannel} later the
- * {@link org.eclipse.net4j.channel.IChannel#getChannelIndex() channel index} of that Channel has to be passed because
- * it is part of the buffer's header. A {@link ByteBuffer} is returned that can be used for putting data.
+ * {@link org.eclipse.net4j.channel.IChannel#getChannelID() channel index} of that Channel has to be passed because it
+ * is part of the buffer's header. A {@link ByteBuffer} is returned that can be used for putting data.
* <p>
* A transition to {@link #GETTING} can be triggered by calling {@link IBuffer#startGetting(SocketChannel)} repeatedly
* until it finally returns a {@link ByteBuffer} that can be used for getting data.
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
index 3e6cad36e3..bc4c878abb 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
@@ -11,6 +11,7 @@
package org.eclipse.net4j.buffer;
import org.eclipse.net4j.channel.IChannel;
+import org.eclipse.net4j.util.IErrorHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -51,7 +52,7 @@ import java.nio.channels.SocketChannel;
* An example for <b>putting</b> values into a buffer and writing it to a {@link SocketChannel}:
* <p>
* <pre style="background-color:#ffffc8; border-width:1px; border-style:solid; padding:.5em;"> // Obtain a fresh buffer
- * Buffer buffer = bufferProvider.getBuffer(); // Start filling the buffer for channelIndex 4711 ByteBuffer byteBuffer =
+ * Buffer buffer = bufferProvider.getBuffer(); // Start filling the buffer for channelID 4711 ByteBuffer byteBuffer =
* buffer.startPutting(4711); byteBuffer.putDouble(15.47); // Write the contents of the Buffer to a // SocketChannel
* without blocking while (!buffer.write(socketChannel)) { // Do something else } </pre> An example for reading a buffer
* from a {@link SocketChannel} and <b>getting</b> values from it:
@@ -72,8 +73,8 @@ import java.nio.channels.SocketChannel;
public interface IBuffer
{
/**
- * Possible argument value of {@link #startPutting(short)} and possible return value of {@link #getChannelIndex()}
- * that indicates that this buffer is not intended to be passed to a {@link SocketChannel}.
+ * Possible argument value of {@link #startPutting(short)} and possible return value of {@link #getChannelID()} that
+ * indicates that this buffer is not intended to be passed to a {@link SocketChannel}.
*/
public static final short NO_CHANNEL = Short.MIN_VALUE;
@@ -102,8 +103,10 @@ public interface IBuffer
/**
* Returns the channel index value stored in the header of this buffer.
+ *
+ * @since 2.0
*/
- public short getChannelIndex();
+ public short getChannelID();
/**
* Returns the capacity of this buffer.
@@ -191,16 +194,16 @@ public interface IBuffer
* {@link ByteBuffer#capacity()}
* </ul>
*
- * @param channelIndex
+ * @param channelID
* The index of an {@link IChannel} that this buffer is intended to be passed to later or {@link #NO_CHANNEL}
* .
* @return A {@link ByteBuffer} that can be used for putting data.
* @throws IllegalStateException
* If the state of this buffer is not {@link BufferState#INITIAL INITIAL} ({@link BufferState#PUTTING
- * PUTTING} is allowed but meaningless if and only if the given <code>channelIndex</code> is equal to the
- * existing <code>channelIndex</code> of this buffer).
+ * PUTTING} is allowed but meaningless if and only if the given <code>channelID</code> is equal to the
+ * existing <code>channelID</code> of this buffer).
*/
- public ByteBuffer startPutting(short channelIndex) throws IllegalStateException;
+ public ByteBuffer startPutting(short channelID) throws IllegalStateException;
/**
* Tries to write the data of this buffer to a {@link SocketChannel}.
@@ -259,4 +262,14 @@ public interface IBuffer
public void clear();
public String formatContent(boolean showHeader);
+
+ /**
+ * @since 2.0
+ */
+ public IErrorHandler getErrorHandler();
+
+ /**
+ * @since 2.0
+ */
+ public void setErrorHandler(IErrorHandler errorHandler);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalRemoteException.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelException.java
index 3fa69c6429..b666688d2a 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalRemoteException.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelException.java
@@ -8,31 +8,34 @@
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
-package org.eclipse.net4j.signal;
+package org.eclipse.net4j.channel;
/**
+ * Thrown by an {@link IChannel} to indicate channel management problems.
+ *
+ * @see IChannelMultiplexer
* @author Eike Stepper
* @since 2.0
*/
-public class SignalRemoteException extends RuntimeException
+public class ChannelException extends RuntimeException
{
private static final long serialVersionUID = 1L;
- public SignalRemoteException()
+ public ChannelException()
{
}
- public SignalRemoteException(String message)
+ public ChannelException(String message)
{
super(message);
}
- public SignalRemoteException(Throwable cause)
+ public ChannelException(Throwable cause)
{
super(cause);
}
- public SignalRemoteException(String message, Throwable cause)
+ public ChannelException(String message, Throwable cause)
{
super(message, cause);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java
index 236807c0a4..68a0257a41 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java
@@ -11,6 +11,7 @@
package org.eclipse.net4j.channel;
import org.eclipse.net4j.buffer.BufferOutputStream;
+import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
/**
@@ -20,11 +21,11 @@ public class ChannelOutputStream extends BufferOutputStream
{
public ChannelOutputStream(IChannel channel)
{
- super(channel, channel.getIndex());
+ super(channel, channel.getID());
}
public ChannelOutputStream(IChannel channel, IBufferProvider bufferProvider)
{
- super(channel, bufferProvider, channel.getIndex());
+ super(channel, bufferProvider, channel == null ? IBuffer.NO_CHANNEL : channel.getID());
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java
index 4d6a3494d7..1118f9ec1e 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java
@@ -42,9 +42,9 @@ import org.eclipse.net4j.util.security.IUserAware;
* An example for opening a channel on an {@link IConnector} and sending an {@link IBuffer}:
* <p>
* <pre style="background-color:#ffffc8; border-width:1px; border-style:solid; padding:.5em;"> // Open a channel
- * IChannel channel = connector.openChannel(); short channelIndex = channel.getIndex(); // Fill a buffer Buffer buffer =
- * bufferProvider.getBuffer(); ByteBuffer byteBuffer = buffer.startPutting(channelIndex); byteBuffer.putDouble(15.47);
- * // Let the channel send the buffer without blocking channel.sendBuffer(buffer); </pre>
+ * IChannel channel = connector.openChannel(); short channelID = channel.getIndex(); // Fill a buffer Buffer buffer =
+ * bufferProvider.getBuffer(); ByteBuffer byteBuffer = buffer.startPutting(channelID); byteBuffer.putDouble(15.47); //
+ * Let the channel send the buffer without blocking channel.sendBuffer(buffer); </pre>
* <p>
* An example for receiving {@link IBuffer}s from channels on an {@link IConnector}:
* <p>
@@ -63,12 +63,12 @@ import org.eclipse.net4j.util.security.IUserAware;
public interface IChannel extends ILocationAware, IUserAware, IBufferHandler, INotifier
{
/**
- * Returns the index of this channel within the array of channels returned from the
- * {@link IChannelMultiplexer#getChannels() getChannels()} method of the multiplexer of this channel.
+ * Returns the ID of this channel. The ID is unique at any time among all channels of the associated
+ * {@link IChannelMultiplexer multiplexer}.
*
* @since 2.0
*/
- public short getIndex();
+ public short getID();
/**
* Returns the multiplexer this channel is associated with. This channel multiplexer can be used, for example, to open
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java
index ba4f9750c0..332c00f1b6 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java
@@ -12,14 +12,13 @@ package org.eclipse.net4j.channel;
import org.eclipse.net4j.ILocationAware;
import org.eclipse.net4j.buffer.IBufferHandler;
-import org.eclipse.net4j.connector.ConnectorException;
import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.util.container.IContainer;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.factory.IFactory;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
-import java.util.List;
+import java.util.Collection;
/**
* @author Eike Stepper
@@ -37,18 +36,7 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel
*
* @since 2.0
*/
- public static final long DEFAULT_CHANNEL_TIMEOUT = -1L;
-
- /**
- * Returns a list of currently open channels. Note that the resulting list does not contain <code>null</code> values.
- * Generally the {@link IChannel#getIndex() index} of a channel <b>must not</b> be used as an index into this list.
- * Each call to this method creates a new copy of the internal channels array, so it can safely be modified bz the
- * caller.
- * <p>
- *
- * @since 2.0
- */
- public List<IChannel> getChannels();
+ public static final long DEFAULT_CHANNEL_TIMEOUT = -1;
/**
* Synchronous request to open a new {@link IChannel} with an undefined channel protocol. Since the peer connector
@@ -61,7 +49,7 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel
* @see #openChannel(IProtocol)
* @since 2.0
*/
- public IChannel openChannel() throws ConnectorException;
+ public IChannel openChannel() throws ChannelException;
/**
* Synchronous request to open a new {@link IChannel} with a channel protocol defined by a given protocol identifier.
@@ -73,7 +61,7 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel
* @see #openChannel(IProtocol)
* @since 2.0
*/
- public IChannel openChannel(String protocolID, Object infraStructure) throws ConnectorException;
+ public IChannel openChannel(String protocolID, Object infraStructure) throws ChannelException;
/**
* Synchronous request to open a new {@link IChannel} with the given channel protocol . The peer connector will lookup
@@ -85,7 +73,14 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel
* @see #openChannel(String, Object)
* @since 2.0
*/
- public IChannel openChannel(IProtocol<?> protocol) throws ConnectorException;
+ public IChannel openChannel(IProtocol<?> protocol) throws ChannelException;
+
+ /**
+ * Returns a collection of currently open channels.
+ *
+ * @since 2.0
+ */
+ public Collection<IChannel> getChannels();
/**
* @since 2.0
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java
new file mode 100644
index 0000000000..1f68c894d2
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java
@@ -0,0 +1,32 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.buffer.BufferInputStream;
+import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.signal.failover.IFailOverStrategy;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public interface ISignalProtocol<INFRA_STRUCTURE> extends IProtocol<INFRA_STRUCTURE>
+{
+ public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT;
+
+ public long getTimeout();
+
+ public void setTimeout(long timeout);
+
+ public IFailOverStrategy getFailOverStrategy();
+
+ public void setFailOverStrategy(IFailOverStrategy failOverStrategy);
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java
index 7e7e9aacc6..3c98db4064 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java
@@ -12,79 +12,48 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.ReflectUtil;
-import org.eclipse.net4j.util.StringUtil;
-import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.io.InputStream;
/**
* @author Eike Stepper
*/
public abstract class Indication extends SignalReactor
{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Indication.class);
-
/**
* @since 2.0
*/
- public Indication(SignalProtocol<?> protocol, short signalID)
+ public Indication(SignalProtocol<?> protocol, short id, String name)
{
- super(protocol, signalID);
+ super(protocol, id, name);
}
- @Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ /**
+ * @since 2.0
+ */
+ public Indication(SignalProtocol<?> protocol, short signalID)
{
- if (TRACER.isEnabled())
- {
- TRACER.trace("================ Indicating " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
- }
-
- InputStream wrappedInputStream = wrapInputStream(in);
-
- try
- {
- indicating(ExtendedDataInputStream.wrap(wrappedInputStream));
- }
- catch (Error ex)
- {
- OM.LOG.error(ex);
- sendExceptionSignal(ex);
- throw ex;
- }
- catch (Exception ex)
- {
- ex = WrappedException.unwrap(ex);
- OM.LOG.error(ex);
- sendExceptionSignal(ex);
- throw ex;
- }
- finally
- {
- finishInputStream(wrappedInputStream);
- }
+ super(protocol, signalID);
}
- protected abstract void indicating(ExtendedDataInputStream in) throws Exception;
-
/**
* @since 2.0
*/
- protected String getMessage(Throwable t)
+ public Indication(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ super(protocol, literal);
+ }
+
+ @Override
+ protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
{
- return StringUtil.formatException(t);
+ doInput(in);
}
- void sendExceptionSignal(Throwable t) throws Exception
+ @Override
+ void doExtendedInput(ExtendedDataInputStream in) throws Exception
{
- SignalProtocol<?> protocol = getProtocol();
- int correlationID = -getCorrelationID();
- String message = getMessage(t);
- new RemoteExceptionRequest(protocol, correlationID, message, t).send();
+ indicating(in);
}
+
+ protected abstract void indicating(ExtendedDataInputStream in) throws Exception;
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java
new file mode 100644
index 0000000000..0381fb5f22
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java
@@ -0,0 +1,190 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.buffer.BufferInputStream;
+import org.eclipse.net4j.buffer.BufferOutputStream;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+import org.eclipse.net4j.util.om.monitor.IMonitor;
+import org.eclipse.net4j.util.om.monitor.Monitor;
+import org.eclipse.net4j.util.om.monitor.MonitorCanceledException;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public abstract class IndicationWithMonitoring extends IndicationWithResponse
+{
+ private Monitor monitor;
+
+ private long lastMonitorAccess;
+
+ /**
+ * @since 2.0
+ */
+ public IndicationWithMonitoring(SignalProtocol<?> protocol, short id, String name)
+ {
+ super(protocol, id, name);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public IndicationWithMonitoring(SignalProtocol<?> protocol, short signalID)
+ {
+ super(protocol, signalID);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public IndicationWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ super(protocol, literal);
+ }
+
+ @Override
+ protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ try
+ {
+ super.execute(in, out);
+ }
+ finally
+ {
+ monitor = null;
+ }
+ }
+
+ @Override
+ protected final void indicating(ExtendedDataInputStream in) throws Exception
+ {
+ final long monitorProgressInterval = in.readLong();
+ ExecutorService executorService = getMonitoringExecutorService();
+ if (executorService != null)
+ {
+ monitor = new LastAccessMonitor();
+ setLastMonitorAccess();
+ executorService.execute(new Runnable()
+ {
+ public void run()
+ {
+ while (monitor != null)
+ {
+ if (System.currentTimeMillis() - lastMonitorAccess > monitorProgressInterval)
+ {
+ setMonitorCanceled();
+ break;
+ }
+
+ sendProgress(monitor.getTotalWork(), monitor.getWork());
+ ConcurrencyUtil.sleep(monitorProgressInterval);
+ }
+ }
+
+ private void sendProgress(int totalWork, int work)
+ {
+ try
+ {
+ new MonitorProgressRequest(getProtocol(), -getCorrelationID(), totalWork, work).sendAsync();
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ }
+ }
+ });
+ }
+
+ indicating(in, monitor.fork(getIndicatingWorkPercent()));
+ }
+
+ @Override
+ protected final void responding(ExtendedDataOutputStream out) throws Exception
+ {
+ responding(out, monitor.fork(getRespondingWorkPercent()));
+ }
+
+ protected abstract void indicating(ExtendedDataInputStream in, IMonitor monitor) throws Exception;
+
+ protected abstract void responding(ExtendedDataOutputStream out, IMonitor monitor) throws Exception;
+
+ /**
+ * @since 2.0
+ */
+ protected ExecutorService getMonitoringExecutorService()
+ {
+ return getProtocol().getExecutorService();
+ }
+
+ protected int getIndicatingWorkPercent()
+ {
+ return 50;
+ }
+
+ protected int getRespondingWorkPercent()
+ {
+ return 50;
+ }
+
+ void setMonitorCanceled()
+ {
+ if (monitor != null)
+ {
+ monitor.cancel();
+ }
+ }
+
+ void setLastMonitorAccess()
+ {
+ lastMonitorAccess = System.currentTimeMillis();
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class LastAccessMonitor extends Monitor
+ {
+ @Override
+ public synchronized void begin(int totalWork)
+ {
+ setLastMonitorAccess();
+ super.begin(totalWork);
+ }
+
+ @Override
+ public synchronized void checkCanceled() throws MonitorCanceledException
+ {
+ setLastMonitorAccess();
+ super.checkCanceled();
+ }
+
+ @Override
+ public synchronized boolean isCanceled()
+ {
+ setLastMonitorAccess();
+ return super.isCanceled();
+ }
+
+ @Override
+ public synchronized void worked(int work)
+ {
+ setLastMonitorAccess();
+ super.worked(work);
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
index 5c16fb3c29..76661e8aab 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
@@ -12,21 +12,22 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.ReflectUtil;
-import org.eclipse.net4j.util.WrappedException;
+import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.io.OutputStream;
/**
* @author Eike Stepper
*/
-public abstract class IndicationWithResponse extends Indication
+public abstract class IndicationWithResponse extends SignalReactor
{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, IndicationWithResponse.class);
+ /**
+ * @since 2.0
+ */
+ public IndicationWithResponse(SignalProtocol<?> protocol, short id, String name)
+ {
+ super(protocol, id, name);
+ }
/**
* @since 2.0
@@ -36,47 +37,70 @@ public abstract class IndicationWithResponse extends Indication
super(protocol, signalID);
}
+ /**
+ * @since 2.0
+ */
+ public IndicationWithResponse(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ super(protocol, literal);
+ }
+
+ /**
+ * @since 2.0
+ */
+ protected String getExceptionMessage(Throwable t)
+ {
+ return StringUtil.formatException(t);
+ }
+
@Override
protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
{
- super.execute(in, out);
- if (TRACER.isEnabled())
- {
- TRACER.trace("================ Responding " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
- }
-
- OutputStream wrappedOutputStream = wrapOutputStream(out);
+ boolean responding = false;
try
{
- responding(ExtendedDataOutputStream.wrap(wrappedOutputStream));
+ doInput(in);
+ responding = true;
+ doOutput(out);
}
catch (Error ex)
{
- OM.LOG.error(ex);
- sendExceptionSignal(ex);
+ sendExceptionSignal(ex, responding);
throw ex;
}
catch (Exception ex)
{
- ex = WrappedException.unwrap(ex);
- OM.LOG.error(ex);
- sendExceptionSignal(ex);
+ sendExceptionSignal(ex, responding);
throw ex;
}
- finally
- {
- finishOutputStream(wrappedOutputStream);
- }
-
- // End response
- out.flushWithEOS();
}
+ protected abstract void indicating(ExtendedDataInputStream in) throws Exception;
+
/**
* <b>Important Note:</b> The response must not be empty, i.e. the stream must be used at least to write a
* <code>boolean</code>. Otherwise synchronization problems will result!
- * @throws Exception TODO
*/
protected abstract void responding(ExtendedDataOutputStream out) throws Exception;
+
+ @Override
+ void doExtendedInput(ExtendedDataInputStream in) throws Exception
+ {
+ indicating(in);
+ }
+
+ @Override
+ void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
+ {
+ responding(out);
+ }
+
+ void sendExceptionSignal(Throwable t, boolean responding) throws Exception
+ {
+ SignalProtocol<?> protocol = getProtocol();
+ int correlationID = -getCorrelationID();
+ String message = getExceptionMessage(t);
+ new RemoteExceptionRequest(protocol, correlationID, responding, message, t).sendAsync();
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java
new file mode 100644
index 0000000000..68f50becd6
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java
@@ -0,0 +1,41 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+/**
+ * @author Eike Stepper
+ */
+class MonitorCanceledIndication extends Indication
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorCanceledIndication.class);
+
+ public MonitorCanceledIndication(SignalProtocol<?> protocol)
+ {
+ super(protocol, SignalProtocol.SIGNAL_MONITOR_CANCELED);
+ }
+
+ @Override
+ protected void indicating(ExtendedDataInputStream in) throws Exception
+ {
+ int correlationID = in.readInt();
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Canceling monitor of signal {0}", correlationID);
+ }
+
+ getProtocol().handleMonitorCanceled(correlationID);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java
new file mode 100644
index 0000000000..075c43909c
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java
@@ -0,0 +1,43 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+/**
+ * @author Eike Stepper
+ */
+class MonitorCanceledRequest extends Request
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorCanceledRequest.class);
+
+ private int correlationID;
+
+ public MonitorCanceledRequest(SignalProtocol<?> protocol, int correlationID)
+ {
+ super(protocol, SignalProtocol.SIGNAL_MONITOR_CANCELED);
+ this.correlationID = correlationID;
+ }
+
+ @Override
+ protected void requesting(ExtendedDataOutputStream out) throws Exception
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Canceling monitor of signal {0}", correlationID);
+ }
+
+ out.writeInt(correlationID);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java
new file mode 100644
index 0000000000..dbfa9c2412
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java
@@ -0,0 +1,43 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+/**
+ * @author Eike Stepper
+ */
+class MonitorProgressIndication extends Indication
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorProgressIndication.class);
+
+ public MonitorProgressIndication(SignalProtocol<?> protocol)
+ {
+ super(protocol, SignalProtocol.SIGNAL_MONITOR_PROGRESS);
+ }
+
+ @Override
+ protected void indicating(ExtendedDataInputStream in) throws Exception
+ {
+ int correlationID = in.readInt();
+ int totalWork = in.readInt();
+ int work = in.readInt();
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Progress of signal {0}: totalWork={1}, work={2}", correlationID, totalWork, work);
+ }
+
+ getProtocol().handleMonitorProgress(correlationID, totalWork, work);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java
new file mode 100644
index 0000000000..503a82df3e
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java
@@ -0,0 +1,51 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+/**
+ * @author Eike Stepper
+ */
+class MonitorProgressRequest extends Request
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorProgressRequest.class);
+
+ private int correlationID;
+
+ private int totalWork;
+
+ private int work;
+
+ public MonitorProgressRequest(SignalProtocol<?> protocol, int correlationID, int totalWork, int work)
+ {
+ super(protocol, SignalProtocol.SIGNAL_MONITOR_PROGRESS);
+ this.correlationID = correlationID;
+ this.totalWork = totalWork;
+ this.work = work;
+ }
+
+ @Override
+ protected void requesting(ExtendedDataOutputStream out) throws Exception
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Progress of signal {0}: totalWork={1}, work={2}", correlationID, totalWork, work);
+ }
+
+ out.writeInt(correlationID);
+ out.writeInt(totalWork);
+ out.writeInt(work);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java
new file mode 100644
index 0000000000..76495612d3
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java
@@ -0,0 +1,39 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public class RemoteException extends RuntimeException
+{
+ private static final long serialVersionUID = 1L;
+
+ private boolean whileResponding;
+
+ public RemoteException(Throwable cause, boolean whileResponding)
+ {
+ super(cause);
+ this.whileResponding = whileResponding;
+ }
+
+ public RemoteException(String message, boolean whileResponding)
+ {
+ super(message);
+ this.whileResponding = whileResponding;
+ }
+
+ public boolean whileResponding()
+ {
+ return whileResponding;
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java
index de18a1b3bb..82218548bf 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java
@@ -11,6 +11,7 @@
package org.eclipse.net4j.signal;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.internal.net4j.bundle.OM;
@@ -19,6 +20,10 @@ import org.eclipse.internal.net4j.bundle.OM;
*/
class RemoteExceptionIndication extends Indication
{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, RemoteExceptionIndication.class);
+
+ private Throwable t;
+
public RemoteExceptionIndication(SignalProtocol<?> protocol)
{
super(protocol, SignalProtocol.SIGNAL_REMOTE_EXCEPTION);
@@ -28,8 +33,13 @@ class RemoteExceptionIndication extends Indication
protected void indicating(ExtendedDataInputStream in) throws Exception
{
int correlationID = in.readInt();
+ boolean responding = in.readBoolean();
String message = in.readString();
- Throwable t;
+ if (TRACER.isEnabled())
+ {
+ String msg = RemoteExceptionRequest.getFirstLine(message);
+ TRACER.format("Reading remote exception for signal {0}: {1}", correlationID, msg);
+ }
try
{
@@ -37,9 +47,9 @@ class RemoteExceptionIndication extends Indication
}
catch (Throwable couldNotLoadExceptionClass)
{
- t = new SignalRemoteException(message);
+ t = new RemoteException(message, responding);
}
- getProtocol().stopSignal(correlationID, t);
+ getProtocol().handleRemoteException(correlationID, t, responding);
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java
index 8dd6f083f0..9190517da4 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java
@@ -11,31 +11,67 @@
package org.eclipse.net4j.signal;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+
+import org.eclipse.internal.net4j.bundle.OM;
/**
* @author Eike Stepper
*/
class RemoteExceptionRequest extends Request
{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, RemoteExceptionRequest.class);
+
private int correlationID;
+ private boolean responding;
+
private String message;
private Throwable t;
- public RemoteExceptionRequest(SignalProtocol<?> protocol, int correlationID, String message, Throwable t)
+ public RemoteExceptionRequest(SignalProtocol<?> protocol, int correlationID, boolean responding, String message,
+ Throwable t)
{
super(protocol, SignalProtocol.SIGNAL_REMOTE_EXCEPTION);
this.correlationID = correlationID;
this.message = message;
this.t = t;
+ this.responding = responding;
}
@Override
protected void requesting(ExtendedDataOutputStream out) throws Exception
{
+ if (TRACER.isEnabled())
+ {
+ String msg = getFirstLine(message);
+ TRACER.format("Writing remote exception for signal {0}: {1}", correlationID, msg);
+ }
+
out.writeInt(correlationID);
+ out.writeBoolean(responding);
out.writeString(message);
out.writeObject(t);
}
+
+ public static String getFirstLine(String message)
+ {
+ if (message == null)
+ {
+ return null;
+ }
+
+ int nl = message.indexOf('\n');
+ if (nl == -1)
+ {
+ nl = message.length();
+ }
+ if (nl > 100)
+ {
+ nl = 100;
+ }
+
+ return message.substring(0, nl);
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
index e79a162163..dcc3d3c75d 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
@@ -12,20 +12,20 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.io.OutputStream;
/**
* @author Eike Stepper
*/
public abstract class Request extends SignalActor
{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Request.class);
+ /**
+ * @since 2.0
+ */
+ public Request(SignalProtocol<?> protocol, short id, String name)
+ {
+ super(protocol, id, name);
+ }
/**
* @since 2.0
@@ -35,18 +35,33 @@ public abstract class Request extends SignalActor
super(protocol, signalID);
}
+ /**
+ * @since 2.0
+ */
+ public Request(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ super(protocol, literal);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public void sendAsync() throws Exception
+ {
+ getProtocol().startSignal(this, getProtocol().getTimeout());
+
+ }
+
@Override
protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
{
- if (TRACER.isEnabled())
- {
- TRACER.trace("================ Requesting " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
- }
-
- OutputStream wrappedOutputStream = wrapOutputStream(out);
- requesting(ExtendedDataOutputStream.wrap(wrappedOutputStream));
- finishOutputStream(wrappedOutputStream);
- out.flushWithEOS();
+ doOutput(out);
+ }
+
+ @Override
+ void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
+ {
+ requesting(out);
}
protected abstract void requesting(ExtendedDataOutputStream out) throws Exception;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
index a5d47735ce..26d0798200 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
@@ -12,20 +12,27 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.io.InputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
/**
* @author Eike Stepper
*/
-public abstract class RequestWithConfirmation<RESULT> extends Request
+public abstract class RequestWithConfirmation<RESULT> extends SignalActor
{
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, RequestWithConfirmation.class);
+ private RESULT result;
+
+ /**
+ * @since 2.0
+ */
+ public RequestWithConfirmation(SignalProtocol<?> protocol, short id, String name)
+ {
+ super(protocol, id, name);
+ }
/**
* @since 2.0
@@ -35,56 +42,84 @@ public abstract class RequestWithConfirmation<RESULT> extends Request
super(protocol, signalID);
}
- @Override
- @SuppressWarnings("unchecked")
- public RESULT send() throws Exception, SignalRemoteException
+ /**
+ * @since 2.0
+ */
+ public RequestWithConfirmation(SignalProtocol<?> protocol, Enum<?> literal)
{
- return (RESULT)super.send();
+ super(protocol, literal);
}
- @Override
- @SuppressWarnings("unchecked")
- public RESULT send(long timeout) throws Exception, SignalRemoteException
+ /**
+ * @since 2.0
+ */
+ public Future<RESULT> sendAsync()
{
- return (RESULT)super.send(timeout);
+ ExecutorService executorService = getAsyncExecutorService();
+ return executorService.submit(new Callable<RESULT>()
+ {
+ public RESULT call() throws Exception
+ {
+ return send();
+ }
+ });
}
- @Override
- protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ /**
+ * @since 2.0
+ */
+ public RESULT send() throws Exception, RemoteException
{
- super.execute(in, out);
- if (TRACER.isEnabled())
- {
- TRACER.trace("================ Confirming " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
- }
+ return send(getProtocol().getTimeout());
+ }
+
+ /**
+ * @since 2.0
+ */
+ public RESULT send(long timeout) throws Exception, RemoteException
+ {
+ result = null;
+ getProtocol().startSignal(this, timeout);
+ return result;
+ }
- InputStream wrappedInputStream = wrapInputStream(in);
- RESULT result = confirming(ExtendedDataInputStream.wrap(wrappedInputStream));
- finishInputStream(wrappedInputStream);
- setResult(result);
+ /**
+ * @since 2.0
+ */
+ protected ExecutorService getAsyncExecutorService()
+ {
+ return getProtocol().getExecutorService();
}
+ @Override
+ protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ doOutput(out);
+ doInput(in);
+ }
+
+ protected abstract void requesting(ExtendedDataOutputStream out) throws Exception;
+
/**
* <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a
* <code>boolean</code>. Otherwise synchronization problems will result!
- *
- * @throws Exception
- * TODO
*/
protected abstract RESULT confirming(ExtendedDataInputStream in) throws Exception;
- void setRemoteException(Throwable t)
+ @Override
+ void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
{
- SignalRemoteException remoteException;
- if (t instanceof SignalRemoteException)
- {
- remoteException = (SignalRemoteException)t;
- }
- else
- {
- remoteException = new SignalRemoteException(t);
- }
+ requesting(out);
+ }
- getBufferInputStream().setException(remoteException);
+ @Override
+ void doExtendedInput(ExtendedDataInputStream in) throws Exception
+ {
+ result = confirming(in);
+ }
+
+ void setRemoteException(Throwable t, boolean responding)
+ {
+ getBufferInputStream().setException(new RemoteException(t, responding));
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
new file mode 100644
index 0000000000..deafa7bc06
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
@@ -0,0 +1,254 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.buffer.BufferInputStream;
+import org.eclipse.net4j.buffer.BufferOutputStream;
+import org.eclipse.net4j.util.ImplementationError;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+import org.eclipse.net4j.util.om.monitor.IMonitor;
+import org.eclipse.net4j.util.om.monitor.Monitor;
+
+import org.eclipse.internal.net4j.bundle.OM;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT>
+{
+ /**
+ * @since 2.0
+ */
+ public static final long DEFAULT_CANCELATION_POLL_INTERVAL = 100L;
+
+ /**
+ * @since 2.0
+ */
+ public static final long DEFAULT_MONITOR_PROGRESS_INTERVAL = 2000;
+
+ private IMonitor mainMonitor;
+
+ private IMonitor remoteMonitor;
+
+ /**
+ * @since 2.0
+ */
+ public RequestWithMonitoring(SignalProtocol<?> protocol, short id, String name)
+ {
+ super(protocol, id, name);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public RequestWithMonitoring(SignalProtocol<?> protocol, short signalID)
+ {
+ super(protocol, signalID);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public RequestWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ super(protocol, literal);
+ }
+
+ @Override
+ public Future<RESULT> sendAsync()
+ {
+ return sendAsync(null);
+ }
+
+ @Override
+ public RESULT send() throws Exception, RemoteException
+ {
+ return send(null);
+ }
+
+ @Override
+ public RESULT send(long timeout) throws Exception, RemoteException
+ {
+ return send(timeout, null);
+ }
+
+ public Future<RESULT> sendAsync(IMonitor monitor)
+ {
+ initMainMonitor(monitor);
+ return super.sendAsync();
+ }
+
+ public RESULT send(IMonitor monitor) throws Exception, RemoteException
+ {
+ initMainMonitor(monitor);
+ return super.send();
+ }
+
+ public RESULT send(long timeout, IMonitor monitor) throws Exception, RemoteException
+ {
+ initMainMonitor(monitor);
+ return super.send(timeout);
+ }
+
+ @Override
+ protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ try
+ {
+ super.execute(in, out);
+ }
+ finally
+ {
+ remoteMonitor.done();
+ remoteMonitor = null;
+
+ mainMonitor.done();
+ mainMonitor = null;
+ }
+ }
+
+ @Override
+ protected final void requesting(ExtendedDataOutputStream out) throws Exception
+ {
+ int remoteWork = 100 - getRequestingWorkPercent() - getConfirmingWorkPercent();
+ if (remoteWork < 0)
+ {
+ throw new ImplementationError("Remote work must not be negative: " + remoteWork);
+ }
+
+ mainMonitor.begin(100);
+ remoteMonitor = mainMonitor.fork(remoteWork);
+
+ ExecutorService executorService = getCancelationExecutorService();
+ if (executorService != null)
+ {
+ executorService.execute(new Runnable()
+ {
+ public void run()
+ {
+ while (mainMonitor != null)
+ {
+ ConcurrencyUtil.sleep(getCancelationPollInterval());
+ if (mainMonitor != null && mainMonitor.isCanceled())
+ {
+ try
+ {
+ new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync();
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ }
+
+ return;
+ }
+ }
+ }
+ });
+ }
+
+ out.writeLong(getMonitorProgressInterval());
+ requesting(out, mainMonitor.fork(getRequestingWorkPercent()));
+ }
+
+ @Override
+ protected final RESULT confirming(ExtendedDataInputStream in) throws Exception
+ {
+ return confirming(in, mainMonitor.fork(getConfirmingWorkPercent()));
+ }
+
+ protected abstract void requesting(ExtendedDataOutputStream out, IMonitor monitor) throws Exception;
+
+ /**
+ * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a
+ * <code>boolean</code>. Otherwise synchronization problems will result!
+ */
+ protected abstract RESULT confirming(ExtendedDataInputStream in, IMonitor monitor) throws Exception;
+
+ /**
+ * @since 2.0
+ */
+ protected ExecutorService getCancelationExecutorService()
+ {
+ return getProtocol().getExecutorService();
+ }
+
+ /**
+ * @since 2.0
+ */
+ protected long getCancelationPollInterval()
+ {
+ return DEFAULT_CANCELATION_POLL_INTERVAL;
+ }
+
+ /**
+ * @since 2.0
+ */
+ protected long getMonitorProgressInterval()
+ {
+ return DEFAULT_MONITOR_PROGRESS_INTERVAL;
+ }
+
+ /**
+ * @since 2.0
+ */
+ protected int getRequestingWorkPercent()
+ {
+ return 25;
+ }
+
+ /**
+ * @since 2.0
+ */
+ protected int getConfirmingWorkPercent()
+ {
+ return 25;
+ }
+
+ void setMonitorProgress(int totalWork, int work)
+ {
+ getBufferInputStream().restartTimeout();
+ if (remoteMonitor != null)
+ {
+ if (remoteMonitor.getTotalWork() == 0)
+ {
+ remoteMonitor.begin(totalWork);
+ remoteMonitor.worked(work);
+ }
+ else
+ {
+ float oldRatio = remoteMonitor.getWork();
+ oldRatio /= remoteMonitor.getTotalWork();
+
+ float newRatio = work;
+ newRatio /= totalWork;
+
+ float newWork = newRatio - oldRatio;
+ newWork *= remoteMonitor.getTotalWork();
+ if (newWork >= 1.0)
+ {
+ remoteMonitor.worked((int)newWork);
+ }
+ }
+ }
+ }
+
+ private void initMainMonitor(IMonitor monitor)
+ {
+ mainMonitor = monitor == null ? new Monitor() : monitor;
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
index 9567f6bfa3..be91752cca 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
@@ -12,6 +12,9 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
+import org.eclipse.net4j.util.ReflectUtil;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.io.IORuntimeException;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -21,6 +24,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.text.MessageFormat;
import java.util.concurrent.TimeoutException;
/**
@@ -28,11 +32,18 @@ import java.util.concurrent.TimeoutException;
*/
public abstract class Signal implements Runnable
{
+ /**
+ * @since 2.0
+ */
+ public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT;
+
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Signal.class);
private SignalProtocol<?> protocol;
- private short signalID;
+ private short id;
+
+ private String name;
private int correlationID;
@@ -48,10 +59,29 @@ public abstract class Signal implements Runnable
*
* @since 2.0
*/
- protected Signal(SignalProtocol<?> protocol, short signalID)
+ public Signal(SignalProtocol<?> protocol, short id, String name)
{
this.protocol = protocol;
- this.signalID = signalID;
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * @since 2.0
+ * @see #Signal(SignalProtocol, short, String)
+ */
+ public Signal(SignalProtocol<?> protocol, short id)
+ {
+ this(protocol, id, null);
+ }
+
+ /**
+ * @since 2.0
+ * @see #Signal(SignalProtocol, short, String)
+ */
+ public Signal(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ this(protocol, (short)literal.ordinal(), literal.name());
}
public SignalProtocol<?> getProtocol()
@@ -60,20 +90,71 @@ public abstract class Signal implements Runnable
}
/**
- * Returns the short integer ID of this signal.
+ * Returns the short integer ID of this signal that is unique among all signals of the associated
+ * {@link #getProtocol() protocol}.
*
* @since 2.0
*/
- public final short getSignalID()
+ public final short getID()
{
- return signalID;
+ return id;
}
- protected final int getCorrelationID()
+ /**
+ * @since 2.0
+ */
+ public String getName()
+ {
+ if (name == null)
+ {
+ // Needs no synchronization because any thread would set the same value.
+ name = ReflectUtil.getSimpleClassName(this);
+ }
+
+ return name;
+ }
+
+ /**
+ * @since 2.0
+ */
+ public final int getCorrelationID()
{
return correlationID;
}
+ @Override
+ public String toString()
+ {
+ return MessageFormat.format("Signal[protocol={0}, id={1}, name={2}, correlation={3}]", getProtocol().getType(),
+ getID(), getName(), getCorrelationID());
+ }
+
+ public final void run()
+ {
+ String threadName = null;
+ try
+ {
+ if (OM.SET_SIGNAL_THREAD_NAME)
+ {
+ threadName = getClass().getSimpleName();
+ Thread.currentThread().setName(threadName);
+ }
+
+ runSync();
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ }
+ finally
+ {
+ if (threadName != null)
+ {
+ Thread.currentThread().setName(threadName + "(FINISHED)");
+ }
+ }
+ }
+
protected final BufferInputStream getBufferInputStream()
{
return bufferInputStream;
@@ -173,33 +254,9 @@ public abstract class Signal implements Runnable
}
}
- public final void run()
- {
- String threadName = null;
- try
- {
- if (OM.SET_SIGNAL_THREAD_NAME)
- {
- threadName = getClass().getSimpleName();
- Thread.currentThread().setName(threadName);
- }
-
- runSync();
- }
- catch (Exception ex)
- {
- OM.LOG.error(ex);
- }
- finally
- {
- if (threadName != null)
- {
- Thread.currentThread().setName(threadName + "(FINISHED)");
- }
- }
- }
+ protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception;
- protected void runSync() throws Exception
+ void runSync() throws Exception
{
try
{
@@ -224,8 +281,6 @@ public abstract class Signal implements Runnable
}
}
- protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception;
-
void setCorrelationID(int correlationID)
{
this.correlationID = correlationID;
@@ -240,4 +295,78 @@ public abstract class Signal implements Runnable
{
bufferOutputStream = outputStream;
}
+
+ void doOutput(BufferOutputStream out) throws Exception
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("================ {0}: {1}", getOutputMeaning(), this); //$NON-NLS-1$
+ }
+
+ OutputStream wrappedOutputStream = wrapOutputStream(out);
+ ExtendedDataOutputStream extended = ExtendedDataOutputStream.wrap(wrappedOutputStream);
+
+ try
+ {
+ doExtendedOutput(extended);
+ }
+ catch (Error ex)
+ {
+ OM.LOG.error(ex);
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ throw ex;
+ }
+ finally
+ {
+ finishOutputStream(wrappedOutputStream);
+ }
+
+ out.flushWithEOS();
+ }
+
+ void doInput(BufferInputStream in) throws Exception
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("================ {0}: {1}", getInputMeaning(), this); //$NON-NLS-1$
+ }
+
+ InputStream wrappedInputStream = wrapInputStream(in);
+ ExtendedDataInputStream extended = ExtendedDataInputStream.wrap(wrappedInputStream);
+
+ try
+ {
+ doExtendedInput(extended);
+ }
+ catch (Error ex)
+ {
+ OM.LOG.error(ex);
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ throw ex;
+ }
+ finally
+ {
+ finishInputStream(wrappedInputStream);
+ }
+ }
+
+ void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
+ {
+ }
+
+ void doExtendedInput(ExtendedDataInputStream in) throws Exception
+ {
+ }
+
+ abstract String getOutputMeaning();
+
+ abstract String getInputMeaning();
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
index 7e30b589ae..a46333efd9 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
@@ -10,57 +10,47 @@
**************************************************************************/
package org.eclipse.net4j.signal;
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.util.ReflectUtil;
-
-import java.text.MessageFormat;
-
/**
* @author Eike Stepper
*/
public abstract class SignalActor extends Signal
{
- public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT;
-
- private boolean terminated;
-
- private Object result;
-
/**
* @since 2.0
*/
- public SignalActor(SignalProtocol<?> protocol, short signalID)
+ public SignalActor(SignalProtocol<?> protocol, short id, String name)
{
- super(protocol, signalID);
+ super(protocol, id, name);
setCorrelationID(protocol.getNextCorrelationID());
}
- public Object send() throws Exception, SignalRemoteException
+ /**
+ * @since 2.0
+ */
+ public SignalActor(SignalProtocol<?> protocol, short id)
{
- return send(NO_TIMEOUT);
+ super(protocol, id);
+ setCorrelationID(protocol.getNextCorrelationID());
}
- public Object send(long timeout) throws Exception, SignalRemoteException
+ /**
+ * @since 2.0
+ */
+ public SignalActor(SignalProtocol<?> protocol, Enum<?> literal)
{
- if (terminated)
- {
- throw new IllegalStateException("Terminated"); //$NON-NLS-1$
- }
-
- getProtocol().startSignal(this, timeout);
- terminated = true;
- return result;
+ super(protocol, literal);
+ setCorrelationID(protocol.getNextCorrelationID());
}
@Override
- public String toString()
+ String getInputMeaning()
{
- return MessageFormat.format("{0}[{1}, {2}, correlation={3} {4}]", ReflectUtil.getSimpleName(getClass()),
- getSignalID(), getProtocol(), getCorrelationID(), terminated ? "SENT" : "UNSENT");
+ return "Confirming";
}
- protected void setResult(Object result)
+ @Override
+ String getOutputMeaning()
{
- this.result = result;
+ return "Requesting";
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index 5c2fb306f4..a19c91d30f 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -16,7 +16,8 @@ import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.ChannelOutputStream;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.protocol.Protocol;
+import org.eclipse.net4j.signal.failover.IFailOverStrategy;
+import org.eclipse.net4j.util.io.IORuntimeException;
import org.eclipse.net4j.util.io.IStreamWrapper;
import org.eclipse.net4j.util.io.StreamWrapperChain;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -24,27 +25,37 @@ import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.internal.net4j.bundle.OM;
import org.eclipse.spi.net4j.InternalConnector;
+import org.eclipse.spi.net4j.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* @author Eike Stepper
*/
-public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE>
+public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> implements
+ ISignalProtocol<INFRA_STRUCTURE>
{
- public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT;
-
/**
* @since 2.0
*/
public static final short SIGNAL_REMOTE_EXCEPTION = -1;
+ /**
+ * @since 2.0
+ */
+ public static final short SIGNAL_MONITOR_CANCELED = -2;
+
+ /**
+ * @since 2.0
+ */
+ public static final short SIGNAL_MONITOR_PROGRESS = -3;
+
private static final int MIN_CORRELATION_ID = 1;
private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE;
@@ -53,9 +64,13 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
private static final ContextTracer STREAM_TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, SignalProtocol.class);
+ private long timeout = NO_TIMEOUT;
+
private IStreamWrapper streamWrapper;
- private Map<Integer, Signal> signals = new ConcurrentHashMap<Integer, Signal>(0);
+ private IFailOverStrategy failOverStrategy;
+
+ private Map<Integer, Signal> signals = new HashMap<Integer, Signal>();
private int nextCorrelationID = MIN_CORRELATION_ID;
@@ -66,6 +81,22 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
/**
* @since 2.0
*/
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ /**
+ * @since 2.0
+ */
+ public void setTimeout(long timeout)
+ {
+ this.timeout = timeout;
+ }
+
+ /**
+ * @since 2.0
+ */
public IChannel open(IConnector connector)
{
InternalConnector conn = (InternalConnector)connector;
@@ -107,6 +138,22 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
}
}
+ /**
+ * @since 2.0
+ */
+ public IFailOverStrategy getFailOverStrategy()
+ {
+ return failOverStrategy;
+ }
+
+ /**
+ * @since 2.0
+ */
+ public void setFailOverStrategy(IFailOverStrategy failOverStrategy)
+ {
+ this.failOverStrategy = failOverStrategy;
+ }
+
public boolean waitForSignals(long timeout)
{
synchronized (signals)
@@ -189,7 +236,7 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
signal = provideSignalReactor(signalID);
signal.setCorrelationID(-correlationID);
- signal.setBufferInputStream(new SignalInputStream(getInputStreamTimeout()));
+ signal.setBufferInputStream(new SignalInputStream(getTimeout()));
signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false));
signals.put(-correlationID, signal);
getExecutorService().execute(signal);
@@ -218,11 +265,6 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
}
}
- public long getInputStreamTimeout()
- {
- return NO_TIMEOUT;
- }
-
@Override
public String toString()
{
@@ -245,18 +287,26 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
protected final SignalReactor provideSignalReactor(short signalID)
{
checkActive();
- if (signalID == SIGNAL_REMOTE_EXCEPTION)
+ switch (signalID)
{
+ case SIGNAL_REMOTE_EXCEPTION:
return new RemoteExceptionIndication(this);
- }
- SignalReactor signal = createSignalReactor(signalID);
- if (signal == null)
- {
- throw new IllegalArgumentException("Invalid signalID " + signalID);
- }
+ case SIGNAL_MONITOR_CANCELED:
+ return new MonitorCanceledIndication(this);
- return signal;
+ case SIGNAL_MONITOR_PROGRESS:
+ return new MonitorProgressIndication(this);
+
+ default:
+ SignalReactor signal = createSignalReactor(signalID);
+ if (signal == null)
+ {
+ throw new IllegalArgumentException("Invalid signalID " + signalID);
+ }
+
+ return signal;
+ }
}
/**
@@ -292,10 +342,10 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
throw new IllegalArgumentException("signalActor.getProtocol() != this"); //$NON-NLS-1$
}
- short signalID = signalActor.getSignalID();
+ short signalID = signalActor.getID();
int correlationID = signalActor.getCorrelationID();
- signalActor.setBufferInputStream(new SignalInputStream(timeout));
signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true));
+ signalActor.setBufferInputStream(new SignalInputStream(timeout));
synchronized (signals)
{
signals.put(correlationID, signalActor);
@@ -314,7 +364,7 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
}
}
- void stopSignal(int correlationID, Throwable t)
+ void handleRemoteException(int correlationID, Throwable t, boolean responding)
{
synchronized (signals)
{
@@ -322,13 +372,39 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
if (signal instanceof RequestWithConfirmation)
{
RequestWithConfirmation<?> request = (RequestWithConfirmation<?>)signal;
- request.setRemoteException(t);
+ request.setRemoteException(t, responding);
}
signals.notifyAll();
}
}
+ void handleMonitorProgress(int correlationID, int totalWork, int work)
+ {
+ synchronized (signals)
+ {
+ Signal signal = signals.get(correlationID);
+ if (signal instanceof RequestWithMonitoring)
+ {
+ RequestWithMonitoring<?> request = (RequestWithMonitoring<?>)signal;
+ request.setMonitorProgress(totalWork, work);
+ }
+ }
+ }
+
+ void handleMonitorCanceled(int correlationID)
+ {
+ synchronized (signals)
+ {
+ Signal signal = signals.get(correlationID);
+ if (signal instanceof IndicationWithMonitoring)
+ {
+ IndicationWithMonitoring indication = (IndicationWithMonitoring)signal;
+ indication.setMonitorCanceled();
+ }
+ }
+ }
+
/**
* @author Eike Stepper
*/
@@ -368,8 +444,14 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
public IBuffer provideBuffer()
{
+ IChannel channel = getChannel();
+ if (channel == null)
+ {
+ throw new IORuntimeException("No channel for protocol " + SignalProtocol.this);
+ }
+
IBuffer buffer = delegate.provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(getChannel().getIndex());
+ ByteBuffer byteBuffer = buffer.startPutting(channel.getID());
if (STREAM_TRACER.isEnabled())
{
STREAM_TRACER.trace("Providing buffer for correlation " + correlationID); //$NON-NLS-1$
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
index 962af81478..ba62a6eb85 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java
@@ -10,8 +10,6 @@
**************************************************************************/
package org.eclipse.net4j.signal;
-import org.eclipse.net4j.util.ReflectUtil;
-
/**
* @author Eike Stepper
*/
@@ -20,15 +18,36 @@ public abstract class SignalReactor extends Signal
/**
* @since 2.0
*/
- protected SignalReactor(SignalProtocol<?> protocol, short signalID)
+ public SignalReactor(SignalProtocol<?> protocol, short id, String name)
+ {
+ super(protocol, id, name);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public SignalReactor(SignalProtocol<?> protocol, short signalID)
{
super(protocol, signalID);
}
+ /**
+ * @since 2.0
+ */
+ public SignalReactor(SignalProtocol<?> protocol, Enum<?> literal)
+ {
+ super(protocol, literal);
+ }
+
+ @Override
+ String getInputMeaning()
+ {
+ return "Indicating";
+ }
+
@Override
- public String toString()
+ String getOutputMeaning()
{
- return ReflectUtil.getSimpleName(getClass()) + "[" + getSignalID() + ", " + getProtocol() + ", correlation=" //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- + getCorrelationID() + "]"; //$NON-NLS-1$
+ return "Responding";
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java
index ff782e1d45..617202a41c 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java
@@ -10,9 +10,8 @@
**************************************************************************/
package org.eclipse.net4j.signal.failover;
+import org.eclipse.net4j.signal.RemoteException;
import org.eclipse.net4j.signal.RequestWithConfirmation;
-import org.eclipse.net4j.signal.SignalActor;
-import org.eclipse.net4j.signal.SignalRemoteException;
import org.eclipse.net4j.util.event.Notifier;
/**
@@ -21,7 +20,7 @@ import org.eclipse.net4j.util.event.Notifier;
*/
public abstract class AbstractFailOverStrategy extends Notifier implements IFailOverStrategy
{
- private long defaultTimeout = SignalActor.NO_TIMEOUT;
+ private long defaultTimeout = RequestWithConfirmation.NO_TIMEOUT;
public AbstractFailOverStrategy()
{
@@ -37,7 +36,7 @@ public abstract class AbstractFailOverStrategy extends Notifier implements IFail
this.defaultTimeout = defaultTimeout;
}
- public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, SignalRemoteException
+ public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, RemoteException
{
return send(request, defaultTimeout);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java
index 242824fcc6..32b12ec620 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java
@@ -12,9 +12,9 @@ package org.eclipse.net4j.signal.failover;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.IConnector;
+import org.eclipse.net4j.signal.RemoteException;
import org.eclipse.net4j.signal.RequestWithConfirmation;
import org.eclipse.net4j.signal.SignalProtocol;
-import org.eclipse.net4j.signal.SignalRemoteException;
import org.eclipse.net4j.util.CheckUtil;
import java.util.concurrent.TimeoutException;
@@ -28,8 +28,7 @@ public abstract class FailOverStrategy extends AbstractFailOverStrategy
{
}
- public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception,
- SignalRemoteException
+ public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, RemoteException
{
for (;;)
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java
index 245e57663a..62fc021bc6 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java
@@ -10,8 +10,8 @@
**************************************************************************/
package org.eclipse.net4j.signal.failover;
+import org.eclipse.net4j.signal.RemoteException;
import org.eclipse.net4j.signal.RequestWithConfirmation;
-import org.eclipse.net4j.signal.SignalRemoteException;
import org.eclipse.net4j.util.event.INotifier;
/**
@@ -29,8 +29,7 @@ public interface IFailOverStrategy extends INotifier
*/
public void setDefaultTimeout(long defaultTimeout);
- public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, SignalRemoteException;
+ public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, RemoteException;
- public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception,
- SignalRemoteException;
+ public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, RemoteException;
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java
index 689675c1aa..79b18c5cc5 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java
@@ -10,8 +10,8 @@
**************************************************************************/
package org.eclipse.net4j.signal.failover;
+import org.eclipse.net4j.signal.RemoteException;
import org.eclipse.net4j.signal.RequestWithConfirmation;
-import org.eclipse.net4j.signal.SignalRemoteException;
import org.eclipse.net4j.util.event.IListener;
/**
@@ -23,8 +23,7 @@ public class NOOPFailOverStrategy extends AbstractFailOverStrategy
{
}
- public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception,
- SignalRemoteException
+ public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, RemoteException
{
return request.send(timeout);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/acceptor/Acceptor.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Acceptor.java
index 6d89a692b6..b63f3f2a10 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/acceptor/Acceptor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Acceptor.java
@@ -8,7 +8,7 @@
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
-package org.eclipse.internal.net4j.acceptor;
+package org.eclipse.spi.net4j;
import org.eclipse.net4j.ITransportConfig;
import org.eclipse.net4j.connector.IConnector;
@@ -23,14 +23,12 @@ import org.eclipse.net4j.util.security.INegotiator;
import org.eclipse.internal.net4j.TransportConfig;
import org.eclipse.internal.net4j.bundle.OM;
-import org.eclipse.spi.net4j.InternalAcceptor;
-import org.eclipse.spi.net4j.InternalConnector;
-
import java.util.HashSet;
import java.util.Set;
/**
* @author Eike Stepper
+ * @since 2.0
*/
public abstract class Acceptor extends Container<IConnector> implements InternalAcceptor
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
index 23cd94753b..4bb4f65100 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
@@ -8,25 +8,20 @@
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
-package org.eclipse.internal.net4j.channel;
+package org.eclipse.spi.net4j;
import org.eclipse.net4j.buffer.BufferState;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferHandler;
import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.concurrent.IWorkSerializer;
import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.internal.net4j.bundle.OM;
-import org.eclipse.spi.net4j.InternalChannel;
-import org.eclipse.spi.net4j.InternalChannelMultiplexer;
-
import java.text.MessageFormat;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,6 +29,7 @@ import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
+ * @since 2.0
*/
public class Channel extends Lifecycle implements InternalChannel
{
@@ -43,7 +39,7 @@ public class Channel extends Lifecycle implements InternalChannel
private InternalChannelMultiplexer channelMultiplexer;
- private short channelIndex = IBuffer.NO_CHANNEL;
+ private short id = IBuffer.NO_CHANNEL;
private ExecutorService receiveExecutor;
@@ -56,9 +52,6 @@ public class Channel extends Lifecycle implements InternalChannel
private transient Queue<IBuffer> sendQueue;
- @ExcludeFromDump
- private transient boolean inverseClosed;
-
public Channel()
{
}
@@ -98,19 +91,15 @@ public class Channel extends Lifecycle implements InternalChannel
this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer;
}
- public short getIndex()
+ public short getID()
{
- return channelIndex;
+ return id;
}
- public void setChannelIndex(short channelIndex)
+ public void setID(short id)
{
- if (channelIndex == IBuffer.NO_CHANNEL)
- {
- throw new IllegalArgumentException("channelIndex == INVALID_CHANNEL_ID"); //$NON-NLS-1$
- }
-
- this.channelIndex = channelIndex;
+ checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$
+ this.id = id;
}
public ExecutorService getReceiveExecutor()
@@ -154,7 +143,7 @@ public class Channel extends Lifecycle implements InternalChannel
if (TRACER.isEnabled())
{
- TRACER.format("Handling buffer from client: {0} --> {1}", buffer, this); //$NON-NLS-1$
+ TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$
}
if (sendQueue == null)
@@ -214,14 +203,14 @@ public class Channel extends Lifecycle implements InternalChannel
@Override
public String toString()
{
- return MessageFormat.format("Channel[{0}, {1}]", channelIndex, getLocation()); //$NON-NLS-1$
+ return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$
}
@Override
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
- checkState(channelIndex != IBuffer.NO_CHANNEL, "channelIndex == NO_CHANNEL"); //$NON-NLS-1$
+ checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$
checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$
}
@@ -257,24 +246,7 @@ public class Channel extends Lifecycle implements InternalChannel
@Override
protected void doDeactivate() throws Exception
{
- if (!inverseClosed)
- {
- channelMultiplexer.closeChannel(this);
- }
-
- super.doDeactivate();
- }
-
- public void finishDeactivate(boolean inverse)
- {
- inverseClosed = inverse;
- if (inverse)
- {
- LifecycleUtil.deactivate(receiveHandler);
- deactivate();
- }
-
- receiveHandler = null;
+ channelMultiplexer.closeChannel(this);
if (receiveSerializer != null)
{
receiveSerializer.dispose();
@@ -286,6 +258,8 @@ public class Channel extends Lifecycle implements InternalChannel
sendQueue.clear();
sendQueue = null;
}
+
+ super.doDeactivate();
}
public void close()
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
new file mode 100644
index 0000000000..5a85aeeee5
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
@@ -0,0 +1,360 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.spi.net4j;
+
+import org.eclipse.net4j.ITransportConfig;
+import org.eclipse.net4j.buffer.IBuffer;
+import org.eclipse.net4j.channel.ChannelException;
+import org.eclipse.net4j.channel.IChannel;
+import org.eclipse.net4j.channel.IChannelMultiplexer;
+import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.protocol.IProtocolProvider;
+import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
+import org.eclipse.net4j.util.container.Container;
+import org.eclipse.net4j.util.factory.FactoryKey;
+import org.eclipse.net4j.util.factory.IFactoryKey;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.security.INegotiationContext;
+
+import org.eclipse.internal.net4j.TransportConfig;
+import org.eclipse.internal.net4j.bundle.OM;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
+
+ private ITransportConfig config;
+
+ private long channelTimeout = IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT;
+
+ @ExcludeFromDump
+ private transient ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>();
+
+ @ExcludeFromDump
+ private transient Set<Short> channelIDs = new HashSet<Short>();
+
+ @ExcludeFromDump
+ private transient int lastChannelID;
+
+ public ChannelMultiplexer()
+ {
+ }
+
+ public synchronized ITransportConfig getConfig()
+ {
+ if (config == null)
+ {
+ config = new TransportConfig();
+ }
+
+ return config;
+ }
+
+ public synchronized void setConfig(ITransportConfig config)
+ {
+ checkInactive();
+ this.config = config;
+ }
+
+ public long getChannelTimeout()
+ {
+ if (channelTimeout == IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT)
+ {
+ return OM.BUNDLE.getDebugSupport().getDebugOption("channel.timeout", 10000);
+ }
+
+ return channelTimeout;
+ }
+
+ public void setChannelTimeout(long channelTimeout)
+ {
+ this.channelTimeout = channelTimeout;
+ }
+
+ public final InternalChannel getChannel(short channelID)
+ {
+ return (InternalChannel)channels.get(channelID);
+ }
+
+ public final Collection<IChannel> getChannels()
+ {
+ return channels.values();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return channels.isEmpty();
+ }
+
+ public IChannel[] getElements()
+ {
+ List<IChannel> list = new ArrayList<IChannel>(getChannels());
+ return list.toArray(new IChannel[list.size()]);
+ }
+
+ public InternalChannel openChannel() throws ChannelException
+ {
+ return openChannel((IProtocol<?>)null);
+ }
+
+ public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException
+ {
+ IProtocol<?> protocol = createProtocol(protocolID, infraStructure);
+ if (protocol == null)
+ {
+ throw new IllegalArgumentException("Unknown protocolID: " + protocolID);
+ }
+
+ return openChannel(protocol);
+ }
+
+ public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException
+ {
+ InternalChannel channel = createChannel();
+ initChannel(channel, protocol);
+ channel.setID(getNextChannelID());
+ addChannel(channel);
+
+ try
+ {
+ try
+ {
+ registerChannelWithPeer(channel.getID(), getChannelTimeout(), protocol);
+ }
+ catch (TimeoutRuntimeException ex)
+ {
+ // Adjust the message for the complete timeout time
+ throw new TimeoutRuntimeException("Registration timeout after " + getChannelTimeout() + " milliseconds");
+ }
+ }
+ catch (ChannelException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ChannelException(ex);
+ }
+
+ return channel;
+ }
+
+ public InternalChannel inverseOpenChannel(short channelID, String protocolID)
+ {
+ IProtocol<?> protocol = createProtocol(protocolID, null);
+
+ InternalChannel channel = createChannel();
+ initChannel(channel, protocol);
+ channel.setID(channelID);
+ addChannel(channel);
+ return channel;
+ }
+
+ public void closeChannel(InternalChannel channel) throws ChannelException
+ {
+ InternalChannel internalChannel = channel;
+ deregisterChannelFromPeer(internalChannel, getChannelTimeout());
+ removeChannel(internalChannel);
+ }
+
+ public void inverseCloseChannel(short channelID) throws ChannelException
+ {
+ InternalChannel channel = getChannel(channelID);
+ LifecycleUtil.deactivate(channel);
+ }
+
+ protected InternalChannel createChannel()
+ {
+ return new Channel();
+ }
+
+ protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
+ {
+ channel.setMultiplexer(this);
+ channel.setReceiveExecutor(getConfig().getReceiveExecutor());
+ // channel.setUserID(getUserID());
+ if (protocol != null)
+ {
+ protocol.setChannel(channel);
+ LifecycleUtil.activate(protocol);
+ if (TRACER.isEnabled())
+ {
+ String protocolType = protocol == null ? null : protocol.getType();
+ TRACER.format("Opening channel with protocol {0}", protocolType);
+ }
+
+ channel.setReceiveHandler(protocol);
+ }
+ else
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Opening channel without protocol");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure)
+ {
+ if (StringUtil.isEmpty(type))
+ {
+ return null;
+ }
+
+ IProtocolProvider protocolProvider = getConfig().getProtocolProvider();
+ if (protocolProvider == null)
+ {
+ throw new ChannelException("No protocol provider configured");
+ }
+
+ IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type);
+ if (protocol == null)
+ {
+ throw new ChannelException("Invalid protocol factory: " + type);
+ }
+
+ protocol.setBufferProvider(getConfig().getBufferProvider());
+ protocol.setExecutorService(getConfig().getReceiveExecutor());
+ if (infraStructure != null)
+ {
+ protocol.setInfraStructure(infraStructure);
+ }
+
+ return protocol;
+ }
+
+ protected IFactoryKey createProtocolFactoryKey(String type)
+ {
+ switch (getLocation())
+ {
+ case SERVER:
+ return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type);
+ case CLIENT:
+ return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ protected boolean isDeferredActivation()
+ {
+ return true;
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ synchronized (channelIDs)
+ {
+ for (IChannel channel : getChannels())
+ {
+ LifecycleUtil.deactivate(channel);
+ }
+
+ channels.clear();
+ }
+
+ super.doDeactivate();
+ }
+
+ protected abstract INegotiationContext createNegotiationContext();
+
+ protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol)
+ throws ChannelException;
+
+ protected abstract void deregisterChannelFromPeer(InternalChannel channel, long timeout) throws ChannelException;
+
+ private short getNextChannelID()
+ {
+ synchronized (channelIDs)
+ {
+ int start = lastChannelID;
+ int maxValue = Short.MAX_VALUE;
+ for (;;)
+ {
+ ++lastChannelID;
+ if (lastChannelID == start)
+ {
+ throw new ChannelException("Too many channels");
+ }
+
+ if (lastChannelID > maxValue)
+ {
+ lastChannelID = 1;
+ }
+
+ short id = (short)(isClient() ? lastChannelID : -lastChannelID);
+ if (channelIDs.add(id))
+ {
+ return id;
+ }
+ }
+ }
+ }
+
+ private void addChannel(InternalChannel channel)
+ {
+ short channelID = channel.getID();
+ if (channelID == IBuffer.CONTROL_CHANNEL || channelID == IBuffer.NO_CHANNEL)
+ {
+ throw new ChannelException("Invalid channel ID: " + channelID);
+ }
+
+ channels.put(channelID, channel);
+ LifecycleUtil.activate(channel);
+ fireElementAddedEvent(channel);
+ }
+
+ private void removeChannel(InternalChannel channel)
+ {
+ try
+ {
+ short channelID = channel.getID();
+ boolean removed;
+ synchronized (channelIDs)
+ {
+ removed = channels.remove(channelID) != null;
+ if (removed)
+ {
+ channelIDs.remove(channelID);
+ }
+ }
+
+ if (removed)
+ {
+ fireElementRemovedEvent(channel);
+ }
+ }
+ catch (RuntimeException ex)
+ {
+ OM.LOG.error(ex);
+ throw ex;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ClientProtocolFactory.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ClientProtocolFactory.java
index 710578e224..aa926d8909 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ClientProtocolFactory.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ClientProtocolFactory.java
@@ -8,12 +8,13 @@
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
-package org.eclipse.net4j.protocol;
+package org.eclipse.spi.net4j;
import org.eclipse.net4j.util.factory.Factory;
/**
* @author Eike Stepper
+ * @since 2.0
*/
public abstract class ClientProtocolFactory extends Factory
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java
new file mode 100644
index 0000000000..acb145e4aa
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java
@@ -0,0 +1,402 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2008 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.spi.net4j;
+
+import org.eclipse.net4j.ITransportConfig;
+import org.eclipse.net4j.buffer.IBuffer;
+import org.eclipse.net4j.connector.ConnectorException;
+import org.eclipse.net4j.connector.ConnectorState;
+import org.eclipse.net4j.connector.IConnector;
+import org.eclipse.net4j.connector.IConnectorStateEvent;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.event.Event;
+import org.eclipse.net4j.util.event.INotifier;
+import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.security.INegotiationContext;
+import org.eclipse.net4j.util.security.INegotiator;
+import org.eclipse.net4j.util.security.NegotiationException;
+
+import org.eclipse.internal.net4j.TransportConfig;
+import org.eclipse.internal.net4j.bundle.OM;
+
+import java.text.MessageFormat;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Eike Stepper
+ * @since 2.0
+ */
+public abstract class Connector extends ChannelMultiplexer implements InternalConnector
+{
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class);
+
+ private String userID;
+
+ private ITransportConfig config;
+
+ private transient ConnectorState connectorState = ConnectorState.DISCONNECTED;
+
+ @ExcludeFromDump
+ private transient CountDownLatch finishedConnecting;
+
+ @ExcludeFromDump
+ private transient CountDownLatch finishedNegotiating;
+
+ @ExcludeFromDump
+ private transient INegotiationContext negotiationContext;
+
+ @ExcludeFromDump
+ private transient NegotiationException negotiationException;
+
+ public Connector()
+ {
+ }
+
+ @Override
+ public synchronized ITransportConfig getConfig()
+ {
+ if (config == null)
+ {
+ config = new TransportConfig();
+ }
+
+ return config;
+ }
+
+ @Override
+ public synchronized void setConfig(ITransportConfig config)
+ {
+ checkInactive();
+ this.config = config;
+ }
+
+ public INegotiator getNegotiator()
+ {
+ return getConfig().getNegotiator();
+ }
+
+ public void setNegotiator(INegotiator negotiator)
+ {
+ getConfig().setNegotiator(negotiator);
+ }
+
+ public INegotiationContext getNegotiationContext()
+ {
+ return negotiationContext;
+ }
+
+ public boolean isClient()
+ {
+ return getLocation() == Location.CLIENT;
+ }
+
+ public boolean isServer()
+ {
+ return getLocation() == Location.SERVER;
+ }
+
+ public String getUserID()
+ {
+ return userID;
+ }
+
+ public void setUserID(String userID)
+ {
+ checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected");
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Setting userID {0} for {1}", userID, this);
+ }
+
+ this.userID = userID;
+ }
+
+ public ConnectorState getState()
+ {
+ return connectorState;
+ }
+
+ public void setState(ConnectorState newState) throws ConnectorException
+ {
+ ConnectorState oldState = getState();
+ if (newState != oldState)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this);
+ }
+
+ connectorState = newState;
+ switch (newState)
+ {
+ case DISCONNECTED:
+ if (finishedConnecting != null)
+ {
+ finishedConnecting.countDown();
+ finishedConnecting = null;
+ }
+
+ if (finishedNegotiating != null)
+ {
+ finishedNegotiating.countDown();
+ finishedNegotiating = null;
+ }
+ break;
+
+ case CONNECTING:
+ finishedConnecting = new CountDownLatch(1);
+ finishedNegotiating = new CountDownLatch(1);
+ // The concrete implementation must advance state to NEGOTIATING or CONNECTED
+ break;
+
+ case NEGOTIATING:
+ finishedConnecting.countDown();
+ negotiationContext = createNegotiationContext();
+ getNegotiator().negotiate(negotiationContext);
+ break;
+
+ case CONNECTED:
+ negotiationContext = null;
+ deferredActivate();
+ finishedConnecting.countDown();
+ finishedNegotiating.countDown();
+ break;
+ }
+
+ fireEvent(new ConnectorStateEvent(this, oldState, newState));
+ }
+ }
+
+ public boolean isDisconnected()
+ {
+ return connectorState == ConnectorState.DISCONNECTED;
+ }
+
+ public boolean isConnecting()
+ {
+ return connectorState == ConnectorState.CONNECTING;
+ }
+
+ public boolean isNegotiating()
+ {
+ return connectorState == ConnectorState.NEGOTIATING;
+ }
+
+ public boolean isConnected()
+ {
+ if (negotiationException != null)
+ {
+ throw new ConnectorException("Connector negotiation failed", negotiationException);
+ }
+
+ return connectorState == ConnectorState.CONNECTED;
+ }
+
+ public void connectAsync() throws ConnectorException
+ {
+ try
+ {
+ activate();
+ }
+ catch (ConnectorException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ConnectorException(ex);
+ }
+ }
+
+ public boolean waitForConnection(long timeout) throws ConnectorException
+ {
+ final long MAX_POLL_INTERVAL = 100L;
+ boolean withTimeout = timeout != NO_TIMEOUT;
+
+ try
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Waiting for connection...");
+ }
+
+ for (;;)
+ {
+ long t = MAX_POLL_INTERVAL;
+ if (withTimeout)
+ {
+ t = Math.min(MAX_POLL_INTERVAL, timeout);
+ timeout -= MAX_POLL_INTERVAL;
+ }
+
+ if (t <= 0)
+ {
+ break;
+ }
+
+ if (finishedNegotiating == null)
+ {
+ break;
+ }
+
+ if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS))
+ {
+ break;
+ }
+ }
+
+ return isConnected();
+ }
+ catch (InterruptedException ex)
+ {
+ return false;
+ }
+ }
+
+ public boolean connect(long timeout) throws ConnectorException
+ {
+ connectAsync();
+ return waitForConnection(timeout);
+ }
+
+ public boolean connect() throws ConnectorException
+ {
+ return connect(NO_TIMEOUT);
+ }
+
+ public ConnectorException disconnect()
+ {
+ Exception ex = deactivate();
+ if (ex == null)
+ {
+ return null;
+ }
+
+ if (ex instanceof ConnectorException)
+ {
+ return (ConnectorException)ex;
+ }
+
+ return new ConnectorException(ex);
+ }
+
+ public short getBufferCapacity()
+ {
+ return getConfig().getBufferProvider().getBufferCapacity();
+ }
+
+ public IBuffer provideBuffer()
+ {
+ return getConfig().getBufferProvider().provideBuffer();
+ }
+
+ public void retainBuffer(IBuffer buffer)
+ {
+ getConfig().getBufferProvider().retainBuffer(buffer);
+ }
+
+ protected void leaveConnecting()
+ {
+ if (getNegotiator() == null)
+ {
+ setState(ConnectorState.CONNECTED);
+ }
+ else
+ {
+ setState(ConnectorState.NEGOTIATING);
+ }
+ }
+
+ @Override
+ protected abstract INegotiationContext createNegotiationContext();
+
+ protected NegotiationException getNegotiationException()
+ {
+ return negotiationException;
+ }
+
+ protected void setNegotiationException(NegotiationException negotiationException)
+ {
+ this.negotiationException = negotiationException;
+ }
+
+ @Override
+ protected boolean isDeferredActivation()
+ {
+ return true;
+ }
+
+ @Override
+ protected void doBeforeActivate() throws Exception
+ {
+ super.doBeforeActivate();
+ if (getConfig().getBufferProvider() == null)
+ {
+ throw new IllegalStateException("getConfig().getBufferProvider() == null");
+ }
+ }
+
+ @Override
+ protected void doActivate() throws Exception
+ {
+ super.doActivate();
+ setState(ConnectorState.CONNECTING);
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ setState(ConnectorState.DISCONNECTED);
+ super.doDeactivate();
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static class ConnectorStateEvent extends Event implements IConnectorStateEvent
+ {
+ private static final long serialVersionUID = 1L;
+
+ private ConnectorState oldState;
+
+ private ConnectorState newState;
+
+ public ConnectorStateEvent(INotifier notifier, ConnectorState oldState, ConnectorState newState)
+ {
+ super(notifier);
+ this.oldState = oldState;
+ this.newState = newState;
+ }
+
+ public IConnector getConnector()
+ {
+ return (IConnector)getSource();
+ }
+
+ public ConnectorState getOldState()
+ {
+ return oldState;
+ }
+
+ public ConnectorState getNewState()
+ {
+ return newState;
+ }
+
+ @Override
+ public String toString()
+ {
+ return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(),
+ getOldState(), getNewState());
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
index 556a1cfda5..90f4039fa4 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
@@ -27,9 +27,12 @@ public interface InternalChannel extends IChannel, IBufferProvider, ILifecycle.I
/**
* @since 2.0
*/
- public void setUserID(String userID);
+ public void setID(short id);
- public void setChannelIndex(short channelIndex);
+ /**
+ * @since 2.0
+ */
+ public void setUserID(String userID);
public ExecutorService getReceiveExecutor();
@@ -43,9 +46,4 @@ public interface InternalChannel extends IChannel, IBufferProvider, ILifecycle.I
public void handleBufferFromMultiplexer(IBuffer buffer);
public Queue<IBuffer> getSendQueue();
-
- /**
- * @since 2.0
- */
- public void finishDeactivate(boolean inverse);
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
index 69a8607a2a..3139624d60 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/Protocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
@@ -8,31 +8,77 @@
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
-package org.eclipse.net4j.protocol;
+package org.eclipse.spi.net4j;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
+import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.event.IListener;
+import org.eclipse.net4j.util.lifecycle.ILifecycle;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
+import org.eclipse.net4j.util.lifecycle.LifecycleEventAdapter;
import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
+ * @since 2.0
*/
public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IProtocol<INFRA_STRUCTURE>
{
- private IChannel channel;
+ private ExecutorService executorService;
private IBufferProvider bufferProvider;
- private ExecutorService executorService;
-
private INFRA_STRUCTURE infraStructure;
+ private IChannel channel;
+
+ @ExcludeFromDump
+ private transient IListener channelListener = new LifecycleEventAdapter()
+ {
+ @Override
+ protected void onDeactivated(ILifecycle lifecycle)
+ {
+ handleChannelDeactivation();
+ };
+ };
+
public Protocol()
{
}
+ public ExecutorService getExecutorService()
+ {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService)
+ {
+ this.executorService = executorService;
+ }
+
+ public IBufferProvider getBufferProvider()
+ {
+ return bufferProvider;
+ }
+
+ public void setBufferProvider(IBufferProvider bufferProvider)
+ {
+ this.bufferProvider = bufferProvider;
+ }
+
+ public INFRA_STRUCTURE getInfraStructure()
+ {
+ return infraStructure;
+ }
+
+ public void setInfraStructure(INFRA_STRUCTURE infraStructure)
+ {
+ this.infraStructure = infraStructure;
+ }
+
/**
* @since 2.0
*/
@@ -62,39 +108,29 @@ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IPr
return channel;
}
- public void setChannel(IChannel channel)
- {
- this.channel = channel;
- }
-
- public IBufferProvider getBufferProvider()
- {
- return bufferProvider;
- }
-
- public void setBufferProvider(IBufferProvider bufferProvider)
- {
- this.bufferProvider = bufferProvider;
- }
-
- public ExecutorService getExecutorService()
+ public void setChannel(IChannel newChannel)
{
- return executorService;
+ if (channel != newChannel)
+ {
+ if (channel != null)
+ {
+ channel.removeListener(channelListener);
+ }
+
+ channel = newChannel;
+ if (channel != null)
+ {
+ channel.addListener(channelListener);
+ }
+ }
}
- public void setExecutorService(ExecutorService executorService)
- {
- this.executorService = executorService;
- }
-
- public INFRA_STRUCTURE getInfraStructure()
- {
- return infraStructure;
- }
-
- public void setInfraStructure(INFRA_STRUCTURE infraStructure)
+ /**
+ * @since 2.0
+ */
+ protected void handleChannelDeactivation()
{
- this.infraStructure = infraStructure;
+ deactivate();
}
@Override
@@ -109,7 +145,7 @@ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IPr
@Override
protected void doDeactivate() throws Exception
{
- channel = null;
+ setChannel(null);
super.doDeactivate();
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ServerProtocolFactory.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ServerProtocolFactory.java
index e4fdb76ffb..b4a30d1078 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ServerProtocolFactory.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ServerProtocolFactory.java
@@ -8,12 +8,13 @@
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
-package org.eclipse.net4j.protocol;
+package org.eclipse.spi.net4j;
import org.eclipse.net4j.util.factory.Factory;
/**
* @author Eike Stepper
+ * @since 2.0
*/
public abstract class ServerProtocolFactory extends Factory
{

Back to the top