Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2019-02-01 16:04:51 +0000
committerEike Stepper2019-02-01 16:04:51 +0000
commitddbf0ebab3920ef12645400a1b929d8accd97831 (patch)
tree5e384f10a49d84d3a0e50c927d444481c87616ee /plugins/org.eclipse.net4j
parent3ff7e9362774a011ceb8e5937ab4f20e87fe27b0 (diff)
downloadcdo-ddbf0ebab3920ef12645400a1b929d8accd97831.tar.gz
cdo-ddbf0ebab3920ef12645400a1b929d8accd97831.tar.xz
cdo-ddbf0ebab3920ef12645400a1b929d8accd97831.zip
[544045] Various concurrency improvements (IWorkSerializer, ThreadPool, RWOLockManager)
https://bugs.eclipse.org/bugs/show_bug.cgi?id=544045
Diffstat (limited to 'plugins/org.eclipse.net4j')
-rw-r--r--plugins/org.eclipse.net4j/.settings/.api_filters9
-rw-r--r--plugins/org.eclipse.net4j/META-INF/MANIFEST.MF32
-rw-r--r--plugins/org.eclipse.net4j/pom.xml2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java12
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java24
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java38
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java66
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java8
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java3
13 files changed, 133 insertions, 89 deletions
diff --git a/plugins/org.eclipse.net4j/.settings/.api_filters b/plugins/org.eclipse.net4j/.settings/.api_filters
index a2d1ab5b6b..1af03789e4 100644
--- a/plugins/org.eclipse.net4j/.settings/.api_filters
+++ b/plugins/org.eclipse.net4j/.settings/.api_filters
@@ -32,6 +32,15 @@
</message_arguments>
</filter>
</resource>
+ <resource path="src/org/eclipse/net4j/buffer/IBuffer.java" type="org.eclipse.net4j.buffer.IBuffer">
+ <filter id="389242988">
+ <message_arguments>
+ <message_argument value="org.eclipse.net4j.buffer.IBuffer"/>
+ <message_argument value="HEADER_SIZE"/>
+ <message_argument value="4"/>
+ </message_arguments>
+ </filter>
+ </resource>
<resource path="src/org/eclipse/net4j/connector/IConnector.java" type="org.eclipse.net4j.connector.IConnector">
<filter id="571473929">
<message_arguments>
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
index 2fe2a2bea0..974a05cf28 100644
--- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.net4j;singleton:=true
-Bundle-Version: 4.7.100.qualifier
+Bundle-Version: 4.8.0.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -11,7 +11,7 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5
Bundle-ClassPath: .
Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional,
org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport
-Export-Package: org.eclipse.internal.net4j;version="4.7.100";
+Export-Package: org.eclipse.internal.net4j;version="4.8.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -19,7 +19,7 @@ Export-Package: org.eclipse.internal.net4j;version="4.7.100";
org.eclipse.net4j.http.common,
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests",
- org.eclipse.internal.net4j.buffer;version="4.7.100";
+ org.eclipse.internal.net4j.buffer;version="4.8.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -28,18 +28,18 @@ Export-Package: org.eclipse.internal.net4j;version="4.7.100";
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests,
org.eclipse.net4j.trace",
- org.eclipse.internal.net4j.bundle;version="4.7.100";x-internal:=true,
- org.eclipse.net4j;version="4.7.100",
- org.eclipse.net4j.acceptor;version="4.7.100",
- org.eclipse.net4j.buffer;version="4.7.100",
- org.eclipse.net4j.channel;version="4.7.100",
- org.eclipse.net4j.connector;version="4.7.100",
- org.eclipse.net4j.protocol;version="4.7.100",
- org.eclipse.net4j.signal;version="4.7.100",
- org.eclipse.net4j.signal.confirmation;version="4.7.100",
- org.eclipse.net4j.signal.heartbeat;version="4.7.100",
- org.eclipse.net4j.signal.security;version="4.7.100",
- org.eclipse.net4j.signal.wrapping;version="4.7.100",
- org.eclipse.spi.net4j;version="4.7.100"
+ org.eclipse.internal.net4j.bundle;version="4.8.0";x-internal:=true,
+ org.eclipse.net4j;version="4.8.0",
+ org.eclipse.net4j.acceptor;version="4.8.0",
+ org.eclipse.net4j.buffer;version="4.8.0",
+ org.eclipse.net4j.channel;version="4.8.0",
+ org.eclipse.net4j.connector;version="4.8.0",
+ org.eclipse.net4j.protocol;version="4.8.0",
+ org.eclipse.net4j.signal;version="4.8.0",
+ org.eclipse.net4j.signal.confirmation;version="4.8.0",
+ org.eclipse.net4j.signal.heartbeat;version="4.8.0",
+ org.eclipse.net4j.signal.security;version="4.8.0",
+ org.eclipse.net4j.signal.wrapping;version="4.8.0",
+ org.eclipse.spi.net4j;version="4.8.0"
Eclipse-BuddyPolicy: registered
Automatic-Module-Name: org.eclipse.net4j
diff --git a/plugins/org.eclipse.net4j/pom.xml b/plugins/org.eclipse.net4j/pom.xml
index c6b9b5932e..ae00c6a5e3 100644
--- a/plugins/org.eclipse.net4j/pom.xml
+++ b/plugins/org.eclipse.net4j/pom.xml
@@ -25,7 +25,7 @@
<groupId>org.eclipse.emf.cdo</groupId>
<artifactId>org.eclipse.net4j</artifactId>
- <version>4.7.100-SNAPSHOT</version>
+ <version>4.8.0-SNAPSHOT</version>
<packaging>eclipse-plugin</packaging>
</project>
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
index b7ddf75411..cd65d47f25 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
@@ -12,7 +12,6 @@
package org.eclipse.internal.net4j.buffer;
import org.eclipse.net4j.buffer.BufferState;
-import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.util.HexUtil;
import org.eclipse.net4j.util.IErrorHandler;
@@ -196,7 +195,7 @@ public class Buffer implements InternalBuffer
if (state == BufferState.INITIAL)
{
- byteBuffer.limit(IBuffer.HEADER_SIZE);
+ byteBuffer.limit(HEADER_SIZE);
state = BufferState.READING_HEADER;
}
@@ -298,7 +297,7 @@ public class Buffer implements InternalBuffer
this.channelID = channelID;
byteBuffer.clear();
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ byteBuffer.position(HEADER_SIZE);
}
return byteBuffer;
@@ -340,7 +339,7 @@ public class Buffer implements InternalBuffer
throw new IllegalStateException(toString() + ": channelID == NO_CHANNEL"); //$NON-NLS-1$
}
- int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + MAKE_PAYLOAD_SIZE_NON_ZERO;
+ int payloadSize = byteBuffer.position() - HEADER_SIZE + MAKE_PAYLOAD_SIZE_NON_ZERO;
boolean eos = isEOS();
if (eos)
@@ -402,7 +401,7 @@ public class Buffer implements InternalBuffer
}
byteBuffer.flip();
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ byteBuffer.position(HEADER_SIZE);
state = BufferState.GETTING;
}
catch (RuntimeException ex)
@@ -508,7 +507,7 @@ public class Buffer implements InternalBuffer
if (state == BufferState.PUTTING && !showHeader)
{
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ byteBuffer.position(HEADER_SIZE);
}
StringBuilder builder = new StringBuilder();
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
index 363e27883d..91de7d4f95 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
@@ -115,7 +115,7 @@ public class BufferInputStream extends InputStream implements IBufferHandler
public void handleBuffer(IBuffer buffer)
{
- // Stream has been closed - ignore the new buffer
+ // If stream has been closed - ignore the new buffer.
if (buffers != null)
{
buffers.add(buffer);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
index 7f7d5f3877..765f24d091 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
@@ -87,7 +87,17 @@ public interface IBuffer
*/
public static final short MAX_CHANNEL = Short.MAX_VALUE;
- public static final short HEADER_SIZE = 4;
+ /**
+ * @since 4.8
+ */
+ public static final int CHANNEL_ID_BYTES = Short.BYTES;
+
+ /**
+ * @since 4.8
+ */
+ public static final int PAYLOAD_SIZE_BYTES = Short.BYTES;
+
+ public static final short HEADER_SIZE = CHANNEL_ID_BYTES + PAYLOAD_SIZE_BYTES;
/**
* Returns the {@link IBufferProvider} that has provided this buffer and that this buffer will be returned to when its
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java
index 8e839e1b35..9d6b959251 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java
@@ -14,8 +14,6 @@ import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
-import org.eclipse.spi.net4j.InternalChannel;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
@@ -79,7 +77,7 @@ public class ChannelInputStream extends BufferInputStream
{
if (isCCAM())
{
- ExecutorService executorService = ((InternalChannel)channel).getReceiveExecutor();
+ ExecutorService executorService = ConcurrencyUtil.getExecutorService(channel);
executorService.submit(new Runnable()
{
public void run()
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
index 3fb4ad76da..bb2e62e529 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
import org.eclipse.net4j.util.ImplementationError;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.RunnableWithName;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.om.monitor.Monitor;
@@ -32,6 +33,8 @@ import java.util.concurrent.Future;
*/
public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT>
{
+ private static final String MAIN_MONITOR_NAME = RequestWithMonitoring.class.getSimpleName() + "-MainMonitor";
+
/**
* @since 2.0
*/
@@ -47,7 +50,7 @@ public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmat
*/
public static final int DEFAULT_MONITOR_TIMEOUT_SECONDS = 10;
- private OMMonitor mainMonitor;
+ private volatile OMMonitor mainMonitor;
private OMMonitor remoteMonitor;
@@ -137,18 +140,29 @@ public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmat
ExecutorService executorService = getCancelationExecutorService();
if (executorService != null)
{
- executorService.execute(new Runnable()
+ executorService.execute(new RunnableWithName()
{
- public void run()
+ @Override
+ public String getName()
{
+ return MAIN_MONITOR_NAME;
+ }
+
+ @Override
+ protected void doRun()
+ {
+ SignalProtocol<?> protocol = getProtocol();
+ int correlationID = getCorrelationID();
+ long cancelationPollInterval = getCancelationPollInterval();
+
while (mainMonitor != null)
{
- ConcurrencyUtil.sleep(getCancelationPollInterval());
+ ConcurrencyUtil.sleep(cancelationPollInterval);
if (mainMonitor != null && mainMonitor.isCanceled())
{
try
{
- new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync();
+ new MonitorCanceledRequest(protocol, correlationID).sendAsync();
}
catch (Exception ex)
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java
index 8dbedaffce..9f106f2464 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java
@@ -14,6 +14,11 @@ import org.eclipse.net4j.util.collection.HashBag;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Provides {@link Signal signal} execution counts when
* {@link SignalProtocol#addListener(IListener) attached} to a {@link ISignalProtocol signal protocol}.
@@ -23,6 +28,8 @@ import org.eclipse.net4j.util.event.IListener;
*/
public final class SignalCounter implements IListener
{
+ private static final boolean fullyQualifiedNames = Boolean.getBoolean("org.eclipse.net4j.signal.SignalCounter.fullyQualifiedNames");
+
private HashBag<Class<? extends Signal>> signals = new HashBag<Class<? extends Signal>>();
private final ISignalProtocol<?> protocol;
@@ -85,6 +92,37 @@ public final class SignalCounter implements IListener
}
}
+ /**
+ * @since 4.8
+ */
+ public void dump(PrintStream out, boolean clearCountsWhenDone)
+ {
+ synchronized (signals)
+ {
+ Map<String, Class<? extends Signal>> signalTypes = new HashMap<String, Class<? extends Signal>>();
+
+ for (Class<? extends Signal> signalType : signals)
+ {
+ String name = fullyQualifiedNames ? signalType.getName() : signalType.getSimpleName();
+ signalTypes.put(name, signalType);
+ }
+
+ String[] names = signalTypes.keySet().toArray(new String[signalTypes.size()]);
+ Arrays.sort(names);
+
+ for (String name : names)
+ {
+ Class<? extends Signal> signalType = signalTypes.get(name);
+ out.println(name + " = " + signals.getCounterFor(signalType));
+ }
+
+ if (clearCountsWhenDone)
+ {
+ clearCounts();
+ }
+ }
+ }
+
public void notifyEvent(IEvent event)
{
if (event instanceof SignalFinishedEvent)
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
index 70542eac7b..e993794e62 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
@@ -16,11 +16,10 @@ import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferHandler;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
-import org.eclipse.net4j.util.concurrent.IWorkSerializer;
import org.eclipse.net4j.util.concurrent.RunnableWithName;
-import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
+import org.eclipse.net4j.util.concurrent.SerializingExecutor;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
@@ -55,15 +54,13 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
private short id = IBuffer.NO_CHANNEL;
- private ExecutorService receiveExecutor;
+ private final Executor receiveSerializer = new SerializingExecutor();
/**
* The external handler for buffers passed from the {@link #connector}.
*/
private IBufferHandler receiveHandler;
- private IWorkSerializer receiveSerializer;
-
private transient Queue<IBuffer> sendQueue;
private transient long sentBuffers;
@@ -125,17 +122,19 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
*/
public ExecutorService getExecutorService()
{
- return receiveExecutor;
+ return ConcurrencyUtil.getExecutorService(channelMultiplexer);
}
+ @Deprecated
public ExecutorService getReceiveExecutor()
{
- return receiveExecutor;
+ return null;
}
+ @Deprecated
public void setReceiveExecutor(ExecutorService receiveExecutor)
{
- this.receiveExecutor = receiveExecutor;
+ // Do nothing.
}
public IBufferHandler getReceiveHandler()
@@ -217,8 +216,6 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
* Handles a buffer sent by the multiplexer. Adds work to the receive queue or releases the buffer.
*
* @see InternalChannelMultiplexer#multiplexChannel
- * @see IWorkSerializer
- * @see ReceiverWork
*/
public void handleBufferFromMultiplexer(IBuffer buffer)
{
@@ -232,7 +229,7 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
++receivedBuffers;
ReceiverWork receiverWork = createReceiverWork(buffer);
- receiveSerializer.addWork(receiverWork);
+ receiveSerializer.execute(receiverWork);
}
else
{
@@ -286,26 +283,14 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
{
super.doActivate();
sendQueue = new SendQueue();
- if (receiveExecutor != null)
- {
- receiveSerializer = new ReceiveSerializer2(receiveExecutor);
- LifecycleUtil.activate(receiveSerializer);
- }
- else
- {
- receiveSerializer = new SynchronousWorkSerializer();
- }
+ LifecycleUtil.activate(receiveSerializer);
}
@Override
protected void doDeactivate() throws Exception
{
unregisterFromMultiplexer();
- if (receiveSerializer != null)
- {
- receiveSerializer.dispose();
- receiveSerializer = null;
- }
+ LifecycleUtil.deactivate(receiveSerializer);
if (sendQueue != null)
{
@@ -332,11 +317,9 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
}
/**
- * If the meaning of this type isn't clear, there really should be more of a description here...
- *
* @author Eike Stepper
* @since 4.1
- * @deprecated As of 4.4 use {@link ExecutorWorkSerializer}.
+ * @deprecated As of 4.4 scheduled for future removal.
*/
@Deprecated
protected class ReceiveSerializer extends org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer
@@ -361,29 +344,6 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
* If the meaning of this type isn't clear, there really should be more of a description here...
*
* @author Eike Stepper
- * @since 4.4
- */
- private class ReceiveSerializer2 extends ExecutorWorkSerializer
- {
- public ReceiveSerializer2(Executor executor)
- {
- super(executor);
- }
-
- @Override
- protected void noWork()
- {
- if (isClosed())
- {
- dispose();
- }
- }
- }
-
- /**
- * If the meaning of this type isn't clear, there really should be more of a description here...
- *
- * @author Eike Stepper
*/
protected class ReceiverWork extends RunnableWithName
{
@@ -403,7 +363,7 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
@Override
public String getName()
{
- return "Net4jReceiveSerializer-" + Channel.this; //$NON-NLS-1$
+ return "Net4jReceiver-" + Channel.this; //$NON-NLS-1$
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
index 1755eeba4c..70defaa3a1 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
@@ -22,6 +22,8 @@ import org.eclipse.net4j.protocol.IProtocolProvider;
import org.eclipse.net4j.protocol.ProtocolVersionException;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.factory.FactoryKey;
@@ -40,6 +42,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
/**
* If the meaning of this type isn't clear, there really should be more of a description here...
@@ -47,7 +50,7 @@ import java.util.concurrent.ConcurrentMap;
* @author Eike Stepper
* @since 2.0
*/
-public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer
+public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer, IExecutorServiceProvider
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
@@ -83,6 +86,11 @@ public abstract class ChannelMultiplexer extends Container<IChannel> implements
this.config = Net4jUtil.copyTransportConfig(this, config);
}
+ public ExecutorService getExecutorService()
+ {
+ return ConcurrencyUtil.getExecutorService(config);
+ }
+
public long getOpenChannelTimeout()
{
if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT)
@@ -234,7 +242,6 @@ public abstract class ChannelMultiplexer extends Container<IChannel> implements
protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
{
channel.setMultiplexer(this);
- channel.setReceiveExecutor(getConfig().getReceiveExecutor());
if (protocol != null)
{
protocol.setChannel(channel);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
index bfa3674c1d..71574dd889 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
@@ -37,8 +37,16 @@ public interface InternalChannel extends IChannel, IBufferProvider, ILifecycle
*/
public void setUserID(String userID);
+ /**
+ * @deprecated As of 4.8 no longer supported.
+ */
+ @Deprecated
public ExecutorService getReceiveExecutor();
+ /**
+ * @deprecated As of 4.8 no longer supported.
+ */
+ @Deprecated
public void setReceiveExecutor(ExecutorService receiveExecutor);
/**
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
index 04a73386a2..060553c08b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.protocol.IProtocol2;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
@@ -141,7 +142,7 @@ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IPr
if (channel != null)
{
channel.addListener(channelListener);
- executorService = ((InternalChannel)channel).getReceiveExecutor();
+ executorService = ConcurrencyUtil.getExecutorService(channel);
bufferProvider = (InternalChannel)channel;
}
}

Back to the top