Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2015-07-22 12:25:46 +0000
committerEike Stepper2015-07-22 12:25:46 +0000
commit017c0e91d0dc68b7ead1b4c893f032d3d72e4e31 (patch)
tree43b63e279685bbafc4e131a243609487e5112c99 /plugins
parentc8eba7351ffd2d37e17db3ae3c2a7009d366b769 (diff)
downloadcdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.tar.gz
cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.tar.xz
cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.zip
[473277] Enhance ThreadPool and use it as much as possible
https://bugs.eclipse.org/bugs/show_bug.cgi?id=473277
Diffstat (limited to 'plugins')
-rw-r--r--plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java115
-rw-r--r--plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java6
-rw-r--r--plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java6
-rw-r--r--plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF10
-rw-r--r--plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java14
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java64
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java11
-rw-r--r--plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF10
-rw-r--r--plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java47
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java18
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java14
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java11
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java9
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java14
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java68
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java17
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java16
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java28
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java10
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java46
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java4
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java26
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java6
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java5
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java6
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java6
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java (renamed from plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/QueueWorkerWorkSerializerTest.java)18
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java58
-rw-r--r--plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF8
-rw-r--r--plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java26
-rw-r--r--plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF62
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java4
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java32
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java102
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java139
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java171
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java22
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java20
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java384
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java12
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java14
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java14
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java25
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java25
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java105
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java6
-rw-r--r--plugins/org.eclipse.net4j/META-INF/MANIFEST.MF32
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java8
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java96
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java15
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java35
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java23
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java63
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java4
58 files changed, 1622 insertions, 498 deletions
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java
index 7d9f7ba982..8ae662e7db 100644
--- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java
+++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java
@@ -14,6 +14,7 @@ package org.eclipse.emf.cdo.internal.common.revision;
import org.eclipse.emf.cdo.common.branch.CDOBranch;
import org.eclipse.emf.cdo.common.branch.CDOBranchManager;
+import org.eclipse.emf.cdo.common.branch.CDOBranchVersion;
import org.eclipse.emf.cdo.common.id.CDOID;
import org.eclipse.emf.cdo.common.revision.CDORevision;
import org.eclipse.emf.cdo.common.revision.CDORevisionKey;
@@ -21,8 +22,11 @@ import org.eclipse.emf.cdo.internal.common.bundle.OM;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache;
+import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.event.IListener;
+import org.eclipse.net4j.util.lifecycle.Lifecycle;
import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.ref.CleanableReferenceQueue;
import org.eclipse.net4j.util.ref.ReferenceQueueWorker;
import java.lang.ref.Reference;
@@ -33,20 +37,36 @@ import java.text.MessageFormat;
/**
* @author Eike Stepper
*/
-public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<InternalCDORevision>
- implements InternalCDORevisionCache
+public abstract class AbstractCDORevisionCache extends Lifecycle implements InternalCDORevisionCache
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_REVISION, AbstractCDORevisionCache.class);
private static boolean disableGC;
+ @ExcludeFromDump
+ private final CleanableReferenceQueue<InternalCDORevision> referenceQueue = new CleanableReferenceQueue<InternalCDORevision>()
+ {
+ @Override
+ protected Reference<InternalCDORevision> createReference(InternalCDORevision revision)
+ {
+ return AbstractCDORevisionCache.this.createReference(revision);
+ }
+
+ @Override
+ protected void cleanReference(Reference<? extends InternalCDORevision> reference)
+ {
+ AbstractCDORevisionCache.this.cleanReference(reference);
+ }
+ };
+
private CDOBranchManager branchManager;
private String name;
public AbstractCDORevisionCache()
{
- setDaemon(true);
+ setPollMillis(ReferenceQueueWorker.DEFAULT_POLL_MILLIS);
+ setMaxWorkPerPoll(ReferenceQueueWorker.DEFAULT_MAX_WORK_PER_POLL);
}
public CDOBranchManager getBranchManager()
@@ -82,49 +102,51 @@ public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<Inte
this.name = name;
}
- @Override
- public String toString()
+ public long getPollMillis()
{
- return formatName("CDORevisionCache");
+ return referenceQueue.getPollMillis();
}
- @Override
- protected String getThreadName()
+ public void setPollMillis(long pollMillis)
{
- return formatName("CDORevisionCacheCleaner");
+ referenceQueue.setPollMillis(pollMillis);
}
- private String formatName(String prefix)
+ public int getMaxWorkPerPoll()
{
- return prefix + (name == null ? "" : "-" + name);
+ return referenceQueue.getMaxWorkPerPoll();
}
- @Override
- protected void work(Reference<? extends InternalCDORevision> reference)
+ public void setMaxWorkPerPoll(int maxWorkPerPoll)
{
- CDORevisionKey key = (CDORevisionKey)reference;
+ referenceQueue.setMaxWorkPerPoll(maxWorkPerPoll);
+ }
- CDOID id = key.getID();
- CDOBranch branch = key.getBranch();
- int version = key.getVersion();
+ public final void addRevision(CDORevision revision)
+ {
+ referenceQueue.register((InternalCDORevision)revision);
+ doAddRevision(revision);
+ }
- InternalCDORevision revision = (InternalCDORevision)removeRevision(id, branch.getVersion(version));
- if (revision != null)
- {
- // Use revision in eviction event
- key = revision;
- }
+ protected abstract void doAddRevision(CDORevision revision);
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new EvictionEventImpl(this, key), listeners);
- }
+ public final CDORevision removeRevision(CDOID id, CDOBranchVersion branchVersion)
+ {
+ referenceQueue.clean();
+ return doRemoveRevision(id, branchVersion);
+ }
- if (TRACER.isEnabled())
- {
- TRACER.format("Evicted {0} from {1}", key, this); //$NON-NLS-1$
- }
+ protected abstract CDORevision doRemoveRevision(CDOID id, CDOBranchVersion branchVersion);
+
+ @Override
+ public String toString()
+ {
+ return formatName("CDORevisionCache");
+ }
+
+ private String formatName(String prefix)
+ {
+ return prefix + (name == null ? "" : "-" + name);
}
protected Reference<InternalCDORevision> createReference(CDORevision revision)
@@ -139,7 +161,7 @@ public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<Inte
TRACER.format("Adding revision {0} to {1}", revision, this); //$NON-NLS-1$
}
- return new CacheSoftReference((InternalCDORevision)revision, getQueue());
+ return new CacheSoftReference((InternalCDORevision)revision, referenceQueue);
}
private Reference<InternalCDORevision> createStrongReference(CDORevision revision)
@@ -152,6 +174,33 @@ public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<Inte
return new CacheStrongReference((InternalCDORevision)revision);
}
+ protected void cleanReference(Reference<? extends InternalCDORevision> reference)
+ {
+ CDORevisionKey key = (CDORevisionKey)reference;
+
+ CDOID id = key.getID();
+ CDOBranch branch = key.getBranch();
+ int version = key.getVersion();
+
+ InternalCDORevision revision = (InternalCDORevision)removeRevision(id, branch.getVersion(version));
+ if (revision != null)
+ {
+ // Use revision in eviction event
+ key = revision;
+ }
+
+ IListener[] listeners = getListeners();
+ if (listeners != null)
+ {
+ fireEvent(new EvictionEventImpl(this, key), listeners);
+ }
+
+ if (TRACER.isEnabled())
+ {
+ TRACER.format("Evicted {0} from {1}", key, this); //$NON-NLS-1$
+ }
+ }
+
/**
* @author Eike Stepper
*/
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java
index c89da07d7f..391c32cd86 100644
--- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java
+++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java
@@ -168,7 +168,8 @@ public class CDORevisionCacheAuditing extends AbstractCDORevisionCache
return result;
}
- public void addRevision(CDORevision revision)
+ @Override
+ protected void doAddRevision(CDORevision revision)
{
CheckUtil.checkArg(revision, "revision");
@@ -194,7 +195,8 @@ public class CDORevisionCacheAuditing extends AbstractCDORevisionCache
}
}
- public InternalCDORevision removeRevision(CDOID id, CDOBranchVersion branchVersion)
+ @Override
+ protected InternalCDORevision doRemoveRevision(CDOID id, CDOBranchVersion branchVersion)
{
CDOBranch branch = branchVersion.getBranch();
checkBranch(branch);
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java
index f5889d250a..f4b2a8a893 100644
--- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java
+++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java
@@ -179,7 +179,8 @@ public class CDORevisionCacheNonAuditing extends AbstractCDORevisionCache
return result;
}
- public void addRevision(CDORevision revision)
+ @Override
+ protected void doAddRevision(CDORevision revision)
{
CheckUtil.checkArg(revision, "revision");
checkBranch(revision.getBranch());
@@ -208,7 +209,8 @@ public class CDORevisionCacheNonAuditing extends AbstractCDORevisionCache
}
}
- public InternalCDORevision removeRevision(CDOID id, CDOBranchVersion branchVersion)
+ @Override
+ protected InternalCDORevision doRemoveRevision(CDOID id, CDOBranchVersion branchVersion)
{
checkBranch(branchVersion.getBranch());
synchronized (revisions)
diff --git a/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF b/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF
index b372da0479..1e9592340d 100644
--- a/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.emf.cdo.examples;singleton:=true
-Bundle-Version: 4.0.400.qualifier
+Bundle-Version: 4.0.500.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Activator: org.eclipse.emf.cdo.internal.examples.bundle.OM$Activator
@@ -20,7 +20,7 @@ Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resoluti
org.eclipse.net4j.tcp;bundle-version="[4.0.0,5.0.0)",
org.eclipse.net4j.db.h2;bundle-version="[4.0.0,5.0.0)"
Import-Package: org.h2.jdbcx;version="[1.0.0,2.0.0)"
-Export-Package: org.eclipse.emf.cdo.examples;version="4.0.400";x-internal:=true,
- org.eclipse.emf.cdo.examples.server;version="4.0.400";x-internal:=true,
- org.eclipse.emf.cdo.examples.server.offline;version="4.0.400";x-internal:=true,
- org.eclipse.emf.cdo.internal.examples.bundle;version="4.0.400";x-internal:=true
+Export-Package: org.eclipse.emf.cdo.examples;version="4.0.500";x-internal:=true,
+ org.eclipse.emf.cdo.examples.server;version="4.0.500";x-internal:=true,
+ org.eclipse.emf.cdo.examples.server.offline;version="4.0.500";x-internal:=true,
+ org.eclipse.emf.cdo.internal.examples.bundle;version="4.0.500";x-internal:=true
diff --git a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java
index 01915f9e56..1ec52e6ed8 100644
--- a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java
+++ b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java
@@ -23,6 +23,7 @@ import org.eclipse.net4j.FactoriesProtocolProvider;
import org.eclipse.net4j.Net4jUtil;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.protocol.IProtocolProvider;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.OMPlatform;
import org.eclipse.net4j.util.om.log.PrintLogHandler;
@@ -31,8 +32,6 @@ import org.eclipse.net4j.util.om.trace.PrintTraceHandler;
import org.eclipse.emf.ecore.EObject;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
/**
* @author Eike Stepper
@@ -47,16 +46,7 @@ public class StandaloneManualExample
OMPlatform.INSTANCE.addTraceHandler(PrintTraceHandler.CONSOLE);
// Prepare receiveExecutor
- final ThreadGroup threadGroup = new ThreadGroup("net4j"); //$NON-NLS-1$
- ExecutorService receiveExecutor = Executors.newCachedThreadPool(new ThreadFactory()
- {
- public Thread newThread(Runnable r)
- {
- Thread thread = new Thread(threadGroup, r);
- thread.setDaemon(true);
- return thread;
- }
- });
+ ExecutorService receiveExecutor = ThreadPool.create();
// Prepare bufferProvider
IBufferProvider bufferProvider = Net4jUtil.createBufferPool();
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java
index 0a15407ab3..83e84b56dd 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java
@@ -85,14 +85,10 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori
return commitContext.getPackageRegistry();
}
- @Override
- protected void indicatingFailed()
+ protected void initializeCommitContext(CDODataInput in) throws Exception
{
- if (commitContext != null)
- {
- commitContext.postCommit(false);
- commitContext = null;
- }
+ int viewID = in.readInt();
+ commitContext = getTransaction(viewID).createCommitContext();
}
@Override
@@ -101,8 +97,8 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori
try
{
monitor.begin(OMMonitor.TEN);
- indicatingCommit(in, monitor.fork(OMMonitor.ONE));
- indicatingCommit(monitor.fork(OMMonitor.TEN - OMMonitor.ONE));
+ indicatingRead(in, monitor.fork(OMMonitor.ONE));
+ indicatingCommit(in, monitor.fork(OMMonitor.TEN - OMMonitor.ONE));
}
catch (IOException ex)
{
@@ -119,7 +115,7 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori
}
}
- protected void indicatingCommit(CDODataInput in, OMMonitor monitor) throws Exception
+ protected void indicatingRead(CDODataInput in, OMMonitor monitor) throws Exception
{
// Create commit context
initializeCommitContext(in);
@@ -292,32 +288,19 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori
}
}
- private ResourceSet createResourceSet(InternalCDOPackageRegistry packageRegistry)
- {
- ResourceSet resourceSet = new ResourceSetImpl()
- {
- @Override
- protected void demandLoad(Resource resource) throws IOException
- {
- // Do nothing: we don't want this ResourceSet to attempt demand-loads.
- }
- };
-
- Resource.Factory resourceFactory = new EcoreResourceFactoryImpl();
- resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put("*", resourceFactory); //$NON-NLS-1$
- resourceSet.setPackageRegistry(packageRegistry);
- return resourceSet;
- }
-
- protected void initializeCommitContext(CDODataInput in) throws Exception
+ protected void indicatingCommit(CDODataInput in, OMMonitor monitor)
{
- int viewID = in.readInt();
- commitContext = getTransaction(viewID).createCommitContext();
+ getRepository().commit(commitContext, monitor);
}
- protected void indicatingCommit(OMMonitor monitor)
+ @Override
+ protected void indicatingFailed()
{
- getRepository().commit(commitContext, monitor);
+ if (commitContext != null)
+ {
+ commitContext.postCommit(false);
+ commitContext = null;
+ }
}
@Override
@@ -466,4 +449,21 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori
throw new IllegalStateException("Illegal transaction: " + view); //$NON-NLS-1$
}
+
+ private ResourceSet createResourceSet(InternalCDOPackageRegistry packageRegistry)
+ {
+ ResourceSet resourceSet = new ResourceSetImpl()
+ {
+ @Override
+ protected void demandLoad(Resource resource) throws IOException
+ {
+ // Do nothing: we don't want this ResourceSet to attempt demand-loads.
+ }
+ };
+
+ Resource.Factory resourceFactory = new EcoreResourceFactoryImpl();
+ resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put("*", resourceFactory); //$NON-NLS-1$
+ resourceSet.setPackageRegistry(packageRegistry);
+ return resourceSet;
+ }
}
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java
index f70b26ced3..409eac0fff 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java
@@ -30,10 +30,17 @@ public class CommitXATransactionPhase1Indication extends CommitTransactionIndica
}
@Override
- protected void indicatingCommit(OMMonitor monitor)
+ protected boolean closeInputStreamAfterMe()
+ {
+ // The commit manager processes phase1 asynchronously, so don't close the input stream on him.
+ return false;
+ }
+
+ @Override
+ protected void indicatingCommit(CDODataInput in, OMMonitor monitor)
{
// Register transactionContext
- getRepository().getCommitManager().preCommit(commitContext, monitor);
+ getRepository().getCommitManager().preCommit(commitContext, in, monitor);
}
@Override
diff --git a/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF b/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF
index 04d3066cb9..b4daf94f68 100644
--- a/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF
@@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.emf.cdo.server.security;singleton:=true
Bundle-Name: %pluginName
-Bundle-Version: 4.3.100.qualifier
+Bundle-Version: 4.3.200.qualifier
Bundle-ClassPath: .
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -12,15 +12,15 @@ Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)",
org.eclipse.emf.cdo.security;bundle-version="[4.1.0,5.0.0)",
org.eclipse.emf.cdo.net4j;bundle-version="[4.1.0,5.0.0)",
org.eclipse.net4j.jvm;bundle-version="[4.1.0,5.0.0)"
-Export-Package: org.eclipse.emf.cdo.server.internal.security;version="4.3.100";
+Export-Package: org.eclipse.emf.cdo.server.internal.security;version="4.3.200";
x-friends:="org.eclipse.emf.cdo.tests,
org.eclipse.emf.cdo.tests.db,
org.eclipse.emf.cdo.tests.db4o,
org.eclipse.emf.cdo.tests.hibernate,
org.eclipse.emf.cdo.tests.mongodb,
org.eclipse.emf.cdo.tests.objectivity",
- org.eclipse.emf.cdo.server.internal.security.bundle;version="4.3.100";x-internal:=true,
- org.eclipse.emf.cdo.server.security;version="4.3.100",
- org.eclipse.emf.cdo.server.spi.security;version="4.3.100"
+ org.eclipse.emf.cdo.server.internal.security.bundle;version="4.3.200";x-internal:=true,
+ org.eclipse.emf.cdo.server.security;version="4.3.200",
+ org.eclipse.emf.cdo.server.spi.security;version="4.3.200"
Bundle-ActivationPolicy: lazy
Bundle-Activator: org.eclipse.emf.cdo.server.internal.security.bundle.OM$Activator
diff --git a/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java b/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java
index 09738b6cfb..f2a9651c9c 100644
--- a/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java
+++ b/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java
@@ -26,6 +26,7 @@ import org.eclipse.emf.cdo.transaction.CDOTransaction;
import org.eclipse.emf.cdo.view.CDOView;
import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.container.IManagedContainer.ContainerAware;
import org.eclipse.net4j.util.factory.ProductCreationException;
@@ -43,7 +44,7 @@ import java.util.concurrent.ExecutorService;
* @author Eike Stepper
* @since 4.3
*/
-public class HomeFolderHandler implements InternalSecurityManager.CommitHandler2
+public class HomeFolderHandler implements InternalSecurityManager.CommitHandler2, IExecutorServiceProvider
{
public static final String DEFAULT_HOME_FOLDER = "/home";
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java
index c7d88e390c..51511df7b9 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java
@@ -11,21 +11,25 @@
*/
package org.eclipse.emf.cdo.internal.server;
+import org.eclipse.emf.cdo.common.protocol.CDODataInput;
import org.eclipse.emf.cdo.spi.server.InternalCommitContext;
import org.eclipse.emf.cdo.spi.server.InternalCommitManager;
import org.eclipse.emf.cdo.spi.server.InternalRepository;
import org.eclipse.emf.cdo.spi.server.InternalTransaction;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
+import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
import org.eclipse.net4j.util.om.monitor.OMMonitor;
+import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@@ -62,8 +66,13 @@ public class CommitManager extends Lifecycle implements InternalCommitManager
{
if (executors == null)
{
- shutdownExecutorService = true;
- executors = Executors.newFixedThreadPool(10);
+ executors = ConcurrencyUtil.getExecutorService(repository);
+
+ if (executors == null)
+ {
+ shutdownExecutorService = true;
+ executors = ThreadPool.create();
+ }
}
return executors;
@@ -73,7 +82,11 @@ public class CommitManager extends Lifecycle implements InternalCommitManager
{
if (shutdownExecutorService)
{
- this.executors.shutdown();
+ if (this.executors != null)
+ {
+ this.executors.shutdown();
+ }
+
shutdownExecutorService = false;
}
@@ -87,12 +100,15 @@ public class CommitManager extends Lifecycle implements InternalCommitManager
setExecutors(null);
}
- /**
- * Create a future to execute commitContext in a different thread.
- */
+ @Deprecated
public void preCommit(InternalCommitContext commitContext, OMMonitor monitor)
{
- TransactionCommitContextEntry contextEntry = new TransactionCommitContextEntry(monitor);
+ preCommit(commitContext, null, monitor);
+ }
+
+ public void preCommit(InternalCommitContext commitContext, CDODataInput in, OMMonitor monitor)
+ {
+ TransactionCommitContextEntry contextEntry = new TransactionCommitContextEntry(in, monitor);
contextEntry.setContext(commitContext);
Future<Object> future = getExecutors().submit(contextEntry.createCallable());
@@ -145,14 +161,17 @@ public class CommitManager extends Lifecycle implements InternalCommitManager
*/
private static final class TransactionCommitContextEntry
{
+ private final CDODataInput in;
+
+ private final OMMonitor monitor;
+
private InternalCommitContext context;
private Future<Object> future;
- private OMMonitor monitor;
-
- public TransactionCommitContextEntry(OMMonitor monitor)
+ public TransactionCommitContextEntry(CDODataInput in, OMMonitor monitor)
{
+ this.in = in;
this.monitor = monitor;
}
@@ -163,6 +182,12 @@ public class CommitManager extends Lifecycle implements InternalCommitManager
public Object call() throws Exception
{
context.write(monitor);
+
+ if (in instanceof Closeable)
+ {
+ IOUtil.closeSilent((Closeable)in);
+ }
+
return null;
}
};
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java
index 00a34fb1f4..42fa968be2 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java
@@ -28,6 +28,8 @@ import org.eclipse.emf.cdo.spi.server.InternalRepository;
import org.eclipse.emf.cdo.spi.server.InternalSession;
import org.eclipse.emf.cdo.spi.server.InternalView;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.container.IContainerDelta.Kind;
import org.eclipse.net4j.util.container.SingleDeltaContainerEvent;
import org.eclipse.net4j.util.event.IEvent;
@@ -38,7 +40,6 @@ import org.eclipse.net4j.util.om.trace.ContextTracer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@@ -85,8 +86,13 @@ public class QueryManager extends Lifecycle implements InternalQueryManager
{
if (executors == null)
{
- shutdownExecutorService = true;
- executors = Executors.newFixedThreadPool(10);
+ executors = ConcurrencyUtil.getExecutorService(repository);
+
+ if (executors == null)
+ {
+ shutdownExecutorService = true;
+ executors = ThreadPool.create();
+ }
}
return executors;
@@ -96,7 +102,11 @@ public class QueryManager extends Lifecycle implements InternalQueryManager
{
if (shutdownExecutorService)
{
- this.executors.shutdown();
+ if (this.executors != null)
+ {
+ this.executors.shutdown();
+ }
+
shutdownExecutorService = false;
}
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java
index 338e9de661..25ede606b0 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java
@@ -104,6 +104,8 @@ import org.eclipse.net4j.util.StringUtil;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.collection.MoveableList;
import org.eclipse.net4j.util.collection.Pair;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.concurrent.RWOLockManager.LockState;
import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
@@ -138,13 +140,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
/**
* @author Eike Stepper
* @since 2.0
*/
-public class Repository extends Container<Object>implements InternalRepository
+public class Repository extends Container<Object>implements InternalRepository, IExecutorServiceProvider
{
private static final int UNCHUNKED = CDORevision.UNCHUNKED;
@@ -1248,7 +1251,8 @@ public class Repository extends Container<Object>implements InternalRepository
if (queryHandlerProvider == null)
{
- queryHandlerProvider = new ContainerQueryHandlerProvider(getContainer());
+ IManagedContainer container = getContainer();
+ queryHandlerProvider = new ContainerQueryHandlerProvider(container);
}
IQueryHandler handler = queryHandlerProvider.getQueryHandler(info);
@@ -1275,6 +1279,12 @@ public class Repository extends Container<Object>implements InternalRepository
this.container = container;
}
+ public ExecutorService getExecutorService()
+ {
+ IManagedContainer container = getContainer();
+ return ConcurrencyUtil.getExecutorService(container);
+ }
+
public Object[] getElements()
{
final Object[] elements = { packageRegistry, branchManager, revisionManager, sessionManager, queryManager,
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java
index 37ea0935c8..d5c072c028 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java
@@ -10,6 +10,8 @@
*/
package org.eclipse.emf.cdo.spi.server;
+import org.eclipse.emf.cdo.common.protocol.CDODataInput;
+
import org.eclipse.net4j.util.om.monitor.OMMonitor;
import java.util.concurrent.ExecutionException;
@@ -30,10 +32,19 @@ public interface InternalCommitManager
/**
* Create a future to execute commitContext in a different thread.
+ *
+ * @deprecated As of 4.5 use {@link #preCommit(InternalCommitContext, CDODataInput, OMMonitor)}.
*/
+ @Deprecated
public void preCommit(InternalCommitContext commitContext, OMMonitor monitor);
/**
+ * Create a future to execute commitContext in a different thread.
+ * @since 4.5
+ */
+ public void preCommit(InternalCommitContext commitContext, CDODataInput in, OMMonitor monitor);
+
+ /**
* Called after a commitContext is done successfully or not.
*/
public void remove(InternalCommitContext commitContext);
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java
index 199c922446..0746b51808 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java
@@ -42,7 +42,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
@@ -184,7 +183,7 @@ public class LockingManagerTest extends AbstractLockingTest
}
};
- ExecutorService executors = Executors.newFixedThreadPool(10);
+ ExecutorService executors = getExecutorService();
Set<Integer> keys = new HashSet<Integer>();
keys.add(1);
keys.add(2);
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java
index ae59a321aa..699e2d2303 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java
@@ -13,9 +13,11 @@ package org.eclipse.emf.cdo.tests.config.impl;
import org.eclipse.emf.cdo.tests.config.IConfig;
import org.eclipse.net4j.util.ObjectUtil;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
@@ -24,6 +26,8 @@ public abstract class Config implements IConfig
{
private static final long serialVersionUID = 1L;
+ protected static ExecutorService executorService = ThreadPool.create("test", 20, 100, 30);
+
private String name;
private transient ConfigTest currentTest;
@@ -109,4 +113,9 @@ public abstract class Config implements IConfig
public void tearDown() throws Exception
{
}
+
+ public static ExecutorService getExecutorService()
+ {
+ return executorService;
+ }
}
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java
index d56556a93c..0b8541d715 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java
@@ -78,6 +78,7 @@ import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
import junit.framework.TestResult;
@@ -100,6 +101,11 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants
{
}
+ public ExecutorService getExecutorService()
+ {
+ return Config.getExecutorService();
+ }
+
public synchronized IScenario getScenario()
{
if (scenario == null)
@@ -574,8 +580,8 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants
@Override
public String toString()
{
- return MessageFormat.format("{0}.{1} [{2}, {3}, {4}]", getClass().getSimpleName(), getName(),
- getRepositoryConfig(), getSessionConfig(), getModelConfig());
+ return MessageFormat.format("{0}.{1} [{2}, {3}, {4}]", getClass().getSimpleName(), getName(), getRepositoryConfig(),
+ getSessionConfig(), getModelConfig());
}
@Override
@@ -861,7 +867,7 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants
@Target({ ElementType.TYPE, ElementType.METHOD })
public @interface Requires
{
- String[] value();
+ String[]value();
}
@Inherited
@@ -869,6 +875,6 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants
@Target({ ElementType.TYPE, ElementType.METHOD })
public @interface Skips
{
- String[] value();
+ String[]value();
}
}
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java
index d69d5f5428..5b8964115a 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java
@@ -71,7 +71,7 @@ public abstract class ConfigTestSuite implements IConstants
{
try
{
- TestWrapper wrapper = new TestWrapper(testClass, scenario, this);
+ ScenarioSuite wrapper = new ScenarioSuite(testClass, scenario, this);
if (wrapper.testCount() != 0)
{
suite.addTest(wrapper);
@@ -152,36 +152,28 @@ public abstract class ConfigTestSuite implements IConstants
/**
* @author Eike Stepper
*/
- private static final class ConstraintsViolatedException extends Exception
- {
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * @author Eike Stepper
- */
- private static final class TestWrapper extends TestSuite
+ private static final class ScenarioSuite extends TestSuite
{
private IScenario scenario;
-
- public TestWrapper(Class<? extends ConfigTest> testClass, IScenario scenario, ConfigTestSuite suite)
+
+ public ScenarioSuite(Class<? extends ConfigTest> testClass, IScenario scenario, ConfigTestSuite suite)
throws ConstraintsViolatedException
{
// super(testClass, testClass.getName()); // Important for the UI to set the *qualified* class name!
this.scenario = scenario;
addTestsFromTestCase(testClass, suite);
}
-
+
@Override
public void runTest(Test test, TestResult result)
{
if (test instanceof ConfigTest)
{
scenario.save();
-
+
ConfigTest configTest = (ConfigTest)test;
configTest.setScenario(scenario);
-
+
if (configTest.isValid())
{
super.runTest(configTest, result);
@@ -192,12 +184,12 @@ public abstract class ConfigTestSuite implements IConstants
super.runTest(test, result);
}
}
-
+
private void addTestsFromTestCase(final Class<?> theClass, ConfigTestSuite suite)
throws ConstraintsViolatedException
{
setName(theClass.getName());
-
+
try
{
getTestConstructor(theClass); // Avoid generating multiple error messages
@@ -208,15 +200,15 @@ public abstract class ConfigTestSuite implements IConstants
+ " has no public constructor TestCase(String name) or TestCase()"));
return;
}
-
+
if (!Modifier.isPublic(theClass.getModifiers()))
{
addTest(warning("Class " + theClass.getName() + " is not public"));
return;
}
-
+
Set<String> capabilities = scenario.getCapabilities();
-
+
Class<?> superClass = theClass;
while (Test.class.isAssignableFrom(superClass))
{
@@ -224,12 +216,12 @@ public abstract class ConfigTestSuite implements IConstants
{
throw new ConstraintsViolatedException();
}
-
+
superClass = superClass.getSuperclass();
}
-
+
List<String> names = new ArrayList<String>();
-
+
superClass = theClass;
while (Test.class.isAssignableFrom(superClass))
{
@@ -240,11 +232,11 @@ public abstract class ConfigTestSuite implements IConstants
addTestMethod(method, names, theClass, suite);
}
}
-
+
superClass = superClass.getSuperclass();
}
}
-
+
private boolean validateConstraints(AnnotatedElement element, Set<String> capabilities)
{
Requires requires = element.getAnnotation(Requires.class);
@@ -258,7 +250,7 @@ public abstract class ConfigTestSuite implements IConstants
}
}
}
-
+
Skips skips = element.getAnnotation(Skips.class);
if (skips != null)
{
@@ -270,10 +262,10 @@ public abstract class ConfigTestSuite implements IConstants
}
}
}
-
+
return true;
}
-
+
private void addTestMethod(Method m, List<String> names, Class<?> theClass, ConfigTestSuite suite)
{
String name = m.getName();
@@ -281,17 +273,17 @@ public abstract class ConfigTestSuite implements IConstants
{
return;
}
-
+
if (!isPublicTestMethod(m))
{
if (isTestMethod(m))
{
addTest(warning("Test method isn't public: " + m.getName() + "(" + theClass.getCanonicalName() + ")"));
}
-
+
return;
}
-
+
names.add(name);
Test test = createTest(theClass, name);
if (test instanceof ConfigTest)
@@ -299,18 +291,26 @@ public abstract class ConfigTestSuite implements IConstants
ConfigTest configTest = (ConfigTest)test;
suite.prepareTest(configTest);
}
-
+
addTest(test);
}
-
+
private boolean isPublicTestMethod(Method m)
{
return isTestMethod(m) && Modifier.isPublic(m.getModifiers());
}
-
+
private boolean isTestMethod(Method m)
{
return m.getParameterTypes().length == 0 && m.getName().startsWith("test") && m.getReturnType().equals(Void.TYPE);
}
}
+
+ /**
+ * @author Eike Stepper
+ */
+ private static final class ConstraintsViolatedException extends Exception
+ {
+ private static final long serialVersionUID = 1L;
+ }
}
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
index 76a9ff1782..d3e8aeb855 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
@@ -64,6 +64,8 @@ import org.eclipse.net4j.jvm.JVMUtil;
import org.eclipse.net4j.util.ObjectUtil;
import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.DelegatingExecutorService;
+import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory;
import org.eclipse.net4j.util.container.ContainerUtil;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.event.IEvent;
@@ -92,6 +94,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
@@ -242,6 +245,16 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf
IManagedContainer container = ContainerUtil.createContainer();
Net4jUtil.prepareContainer(container);
CDONet4jServerUtil.prepareContainer(container);
+
+ container.registerFactory(new ExecutorServiceFactory()
+ {
+ @Override
+ public ExecutorService create(String threadGroupName)
+ {
+ return new DelegatingExecutorService(executorService);
+ }
+ });
+
return container;
}
@@ -533,8 +546,8 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf
if (!path.startsWith(prefix) && !hasAnnotation(CleanRepositoriesBefore.class))
{
throw new RuntimeException("Test case " + test.getClass().getName() + '.' + test.getName()
- + " does not use getResourcePath() for resource " + path + ", nor does it declare @"
- + CleanRepositoriesBefore.class.getSimpleName());
+ + " does not use getResourcePath() for resource " + path + ", nor does it declare @"
+ + CleanRepositoriesBefore.class.getSimpleName());
}
}
}
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java
index 8cb638ea12..4f3ed8bbe3 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java
@@ -32,6 +32,8 @@ import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.jvm.JVMUtil;
import org.eclipse.net4j.tcp.TCPUtil;
import org.eclipse.net4j.tcp.ssl.SSLUtil;
+import org.eclipse.net4j.util.concurrent.DelegatingExecutorService;
+import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory;
import org.eclipse.net4j.util.container.ContainerUtil;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.event.IListener;
@@ -42,13 +44,13 @@ import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.security.IPasswordCredentialsProvider;
import org.eclipse.emf.ecore.EPackage;
-import org.eclipse.emf.ecore.EPackage.Registry;
import org.eclipse.emf.ecore.impl.EPackageImpl;
import java.io.File;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
@@ -61,7 +63,7 @@ public abstract class SessionConfig extends Config implements ISessionConfig
public static final String PROP_TEST_FETCH_RULE_MANAGER = "test.session.FetchRuleManager";
- private static final Registry GLOBAL_REGISTRY = EPackage.Registry.INSTANCE;
+ private static final EPackage.Registry GLOBAL_REGISTRY = EPackage.Registry.INSTANCE;
private static final long serialVersionUID = 1L;
@@ -123,6 +125,16 @@ public abstract class SessionConfig extends Config implements ISessionConfig
{
IManagedContainer container = ContainerUtil.createContainer();
Net4jUtil.prepareContainer(container);
+
+ container.registerFactory(new ExecutorServiceFactory()
+ {
+ @Override
+ public ExecutorService create(String threadGroupName)
+ {
+ return new DelegatingExecutorService(executorService);
+ }
+ });
+
return container;
}
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java
index 9062449980..4e992c18e2 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java
@@ -96,10 +96,12 @@ import org.eclipse.emf.internal.cdo.util.DefaultLocksChangedEvent;
import org.eclipse.net4j.util.AdapterUtil;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.WrappedException;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IRWLockManager;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.concurrent.IRWOLockManager;
-import org.eclipse.net4j.util.concurrent.QueueRunner2;
import org.eclipse.net4j.util.concurrent.RWOLockManager;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.EventUtil;
@@ -149,11 +151,13 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public abstract class CDOSessionImpl extends CDOTransactionContainerImpl implements InternalCDOSession
+public abstract class CDOSessionImpl extends CDOTransactionContainerImpl
+ implements InternalCDOSession, IExecutorServiceProvider
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SESSION, CDOSessionImpl.class);
@@ -363,6 +367,11 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
return fetchRuleManager;
}
+ public ExecutorService getExecutorService()
+ {
+ return ConcurrencyUtil.getExecutorService(sessionProtocol);
+ }
+
/**
* @since 3.0
*/
@@ -1308,6 +1317,9 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
protected void doAfterActivate() throws Exception
{
super.doAfterActivate();
+
+ ExecutorService executorService = ConcurrencyUtil.getExecutorService(sessionProtocol);
+ invalidator.setExecutor(executorService);
LifecycleUtil.activate(invalidator);
}
@@ -1738,7 +1750,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
/**
* @author Eike Stepper
*/
- private class Invalidator extends QueueRunner2<Invalidation>
+ private class Invalidator extends ExecutorWorkSerializer
{
private static final boolean DEBUG = false;
@@ -1820,11 +1832,11 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
}
@Override
- protected void noWork(WorkContext context)
+ protected void noWork()
{
if (isClosed() && terminateIfSessionClosed)
{
- context.terminate();
+ dispose();
}
}
@@ -1834,12 +1846,6 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
super.doAfterActivate();
terminateIfSessionClosed = true;
}
-
- @Override
- protected String getThreadName()
- {
- return "CDOSessionInvalidator-" + CDOSessionImpl.this;
- }
}
/**
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java
index f271273012..7170faedd9 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java
@@ -47,6 +47,8 @@ import org.eclipse.emf.cdo.view.CDOView;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.collection.Pair;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.event.EventUtil;
import org.eclipse.net4j.util.event.IListener;
@@ -70,11 +72,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public class DelegatingSessionProtocol extends Lifecycle implements CDOSessionProtocol
+public class DelegatingSessionProtocol extends Lifecycle implements CDOSessionProtocol, IExecutorServiceProvider
{
private CDOSessionProtocol delegate;
@@ -122,6 +125,11 @@ public class DelegatingSessionProtocol extends Lifecycle implements CDOSessionPr
return (CDOSession)delegate.getSession();
}
+ public ExecutorService getExecutorService()
+ {
+ return ConcurrencyUtil.getExecutorService(delegate);
+ }
+
public boolean cancelQuery(int queryId)
{
int attempt = 0;
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java
index 8b6cd1ca12..1a28863d05 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java
@@ -67,8 +67,11 @@ import org.eclipse.net4j.util.ObjectUtil;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.collection.HashBag;
import org.eclipse.net4j.util.collection.Pair;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
-import org.eclipse.net4j.util.concurrent.QueueRunner;
+import org.eclipse.net4j.util.concurrent.IWorkSerializer;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.event.Notifier;
@@ -111,11 +114,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public class CDOViewImpl extends AbstractCDOView
+public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProvider
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_VIEW, CDOViewImpl.class);
@@ -137,7 +141,7 @@ public class CDOViewImpl extends AbstractCDOView
private Map<CDOObject, CDOLockState> lockStates = new WeakHashMap<CDOObject, CDOLockState>();
- private QueueRunner invalidationRunner = createInvalidationRunner();
+ private ExecutorWorkSerializer invalidationRunner = createInvalidationRunner();
private volatile boolean invalidationRunnerActive;
@@ -177,6 +181,11 @@ public class CDOViewImpl extends AbstractCDOView
viewID = viewId;
}
+ public ExecutorService getExecutorService()
+ {
+ return ConcurrencyUtil.getExecutorService(session);
+ }
+
/**
* @since 2.0
*/
@@ -932,8 +941,8 @@ public class CDOViewImpl extends AbstractCDOView
{
if (async)
{
- QueueRunner runner = getInvalidationRunner();
- runner.addWork(new InvalidationRunnable(branch, lastUpdateTime, allChangedObjects, allDetachedObjects,
+ IWorkSerializer serializer = getInvalidationRunner();
+ serializer.addWork(new InvalidationRunnable(branch, lastUpdateTime, allChangedObjects, allDetachedObjects,
oldRevisions, clearResourcePathCache));
}
else
@@ -997,35 +1006,23 @@ public class CDOViewImpl extends AbstractCDOView
}
}
- public QueueRunner getInvalidationRunner()
+ public ExecutorWorkSerializer getInvalidationRunner()
{
return invalidationRunner;
}
- private QueueRunner createInvalidationRunner()
+ private ExecutorWorkSerializer createInvalidationRunner()
{
- return new QueueRunner()
+ return new ExecutorWorkSerializer()
{
@Override
- protected String getThreadName()
- {
- return "CDOViewInvalidationRunner-" + CDOViewImpl.this; //$NON-NLS-1$
- }
-
- @Override
- protected void noWork(WorkContext context)
+ protected void noWork()
{
if (isClosed())
{
- context.terminate();
+ dispose();
}
}
-
- @Override
- public String toString()
- {
- return getThreadName();
- }
};
}
@@ -1279,9 +1276,12 @@ public class CDOViewImpl extends AbstractCDOView
{
super.doAfterActivate();
+ ExecutorService executorService = ConcurrencyUtil.getExecutorService(session);
+ invalidationRunner.setExecutor(executorService);
+
try
{
- invalidationRunner.activate();
+ LifecycleUtil.activate(invalidationRunner);
}
catch (LifecycleException ex)
{
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java
index 1f9c1a12ee..f39d73c3c3 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java
@@ -13,10 +13,10 @@ package org.eclipse.net4j.tests;
import org.eclipse.net4j.tests.bugzilla.Bugzilla_241463_Test;
import org.eclipse.net4j.tests.bugzilla.Bugzilla_259086_Test;
import org.eclipse.net4j.tests.bugzilla.Bugzilla_262875_Test;
+import org.eclipse.net4j.util.tests.ExecutorWorkSerializerTest;
import org.eclipse.net4j.util.tests.ExpectedIOTest;
import org.eclipse.net4j.util.tests.ExtendedIOTest;
import org.eclipse.net4j.util.tests.MultiMapTest;
-import org.eclipse.net4j.util.tests.QueueWorkerWorkSerializerTest;
import org.eclipse.net4j.util.tests.ReferenceValueMapTest;
import org.eclipse.net4j.util.tests.SecurityTest;
import org.eclipse.net4j.util.tests.SortedFileMapTest;
@@ -52,7 +52,7 @@ public class AllTests
suite.addTestSuite(SignalMonitorTest.TCP.class);
suite.addTestSuite(ExceptionTest.TCP.class);
suite.addTestSuite(SecurityTest.class);
- suite.addTestSuite(QueueWorkerWorkSerializerTest.class);
+ suite.addTestSuite(ExecutorWorkSerializerTest.class);
suite.addTestSuite(ExpectedIOTest.class);
// Bugzillas
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java
index ccc0ff7cd4..d3c555cc28 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java
@@ -31,6 +31,7 @@ import org.eclipse.net4j.tcp.TCPUtil;
import org.eclipse.net4j.tests.bundle.OM;
import org.eclipse.net4j.util.collection.RoundRobinBlockingQueue;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.security.ChallengeNegotiator;
@@ -47,7 +48,6 @@ import org.eclipse.spi.net4j.InternalChannel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* @author Eike Stepper
@@ -221,9 +221,7 @@ public class TCPConnectorTest extends AbstractTransportTest
public void testDeferredActivation() throws Exception
{
final long DELAY = 500L;
- threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
-
+ threadPool = ThreadPool.create();
bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
@@ -272,9 +270,7 @@ public class TCPConnectorTest extends AbstractTransportTest
public void testNegotiationSuccess() throws Exception
{
- threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
-
+ threadPool = ThreadPool.create();
bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
@@ -351,9 +347,7 @@ public class TCPConnectorTest extends AbstractTransportTest
public void testInvalidUser() throws Exception
{
- threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
-
+ threadPool = ThreadPool.create();
bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
@@ -412,9 +406,7 @@ public class TCPConnectorTest extends AbstractTransportTest
public void testInvalidPassword() throws Exception
{
- threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
-
+ threadPool = ThreadPool.create();
bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
@@ -473,9 +465,7 @@ public class TCPConnectorTest extends AbstractTransportTest
public void testNoNegotiator() throws Exception
{
- threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
-
+ threadPool = ThreadPool.create();
bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
@@ -513,9 +503,7 @@ public class TCPConnectorTest extends AbstractTransportTest
public void testNegotiatorTooLate() throws Exception
{
- threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
-
+ threadPool = ThreadPool.create();
bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java
index 5fce1508d8..1abe83ff23 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java
@@ -18,12 +18,12 @@ import org.eclipse.net4j.defs.Net4jDefsFactory;
import org.eclipse.net4j.internal.jvm.JVMClientConnector;
import org.eclipse.net4j.jvm.IJVMAcceptor;
import org.eclipse.net4j.jvm.IJVMConnector;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.defs.Net4jUtilDefsFactory;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.tests.AbstractOMTest;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* @author Andre Dietisheim
@@ -88,8 +88,6 @@ public class JVMAcceptorDefImplTest extends AbstractOMTest
private ExecutorService createThreadPool()
{
- ExecutorService threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
- return threadPool;
+ return ThreadPool.create();
}
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java
index 0efd62a212..97d079626b 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java
@@ -18,12 +18,12 @@ import org.eclipse.net4j.defs.Net4jDefsFactory;
import org.eclipse.net4j.internal.jvm.JVMAcceptor;
import org.eclipse.net4j.jvm.IJVMAcceptor;
import org.eclipse.net4j.jvm.IJVMConnector;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.defs.Net4jUtilDefsFactory;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.tests.AbstractOMTest;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* @author Andre Dietisheim
@@ -63,8 +63,7 @@ public class JVMConnectorDefImplTest extends AbstractOMTest
private IJVMAcceptor createJVMAcceptor()
{
- ExecutorService threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
+ ExecutorService threadPool = ThreadPool.create();
IBufferPool bufferPool = Net4jUtil.createBufferPool();
LifecycleUtil.activate(bufferPool);
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java
index 5337a886d0..2c04cdfe9c 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java
@@ -16,6 +16,7 @@ import org.eclipse.net4j.internal.tcp.TCPAcceptor;
import org.eclipse.net4j.internal.tcp.TCPClientConnector;
import org.eclipse.net4j.internal.tcp.TCPConnector;
import org.eclipse.net4j.internal.tcp.TCPSelector;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.security.ChallengeNegotiator;
import org.eclipse.net4j.util.security.INegotiator;
@@ -27,7 +28,6 @@ import org.eclipse.net4j.util.security.ResponseNegotiator;
import org.eclipse.net4j.util.security.UserManager;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* @author Eike Stepper
@@ -131,8 +131,6 @@ class Util
public static ExecutorService createThreadPool()
{
- ExecutorService threadPool = Executors.newCachedThreadPool();
- LifecycleUtil.activate(threadPool);
- return threadPool;
+ return ThreadPool.create();
}
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java
index 78f60077cf..950d8db519 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java
@@ -58,8 +58,14 @@ import junit.framework.TestResult;
*/
public abstract class AbstractOMTest extends TestCase
{
+ /**
+ * Timeout duration in millseconds if timeout <b>is not</b> expected.
+ */
public static final long DEFAULT_TIMEOUT = 15 * 1000;
+ /**
+ * Timeout duration in millseconds if timeout <b>is</b> expected.
+ */
public static final long DEFAULT_TIMEOUT_EXPECTED = 3 * 1000;
public static boolean EXTERNAL_LOG;
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/QueueWorkerWorkSerializerTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
index 0ee4714626..4aea456412 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/QueueWorkerWorkSerializerTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
@@ -11,7 +11,7 @@
*/
package org.eclipse.net4j.util.tests;
-import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
+import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
import org.eclipse.net4j.util.io.IOUtil;
import java.util.Random;
@@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * A test for {@link QueueWorkerWorkSerializer}.
+ * A test for {@link ExecutorWorkSerializer}.
*
* @author Andre Dietisheim
*/
-public class QueueWorkerWorkSerializerTest extends AbstractOMTest
+public class ExecutorWorkSerializerTest extends AbstractOMTest
{
/** timeout to wait for execution of all work units. */
private static final int WORK_COMPLETION_TIMEOUT = 10000;
@@ -47,22 +47,24 @@ public class QueueWorkerWorkSerializerTest extends AbstractOMTest
private ExecutorService threadPool;
/** The queue worker to submit the work units to. */
- private QueueWorkerWorkSerializer queueWorker;
+ private ExecutorWorkSerializer queueWorker;
@Override
public void setUp()
{
- threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS);
- workConsumedLatch = new CountDownLatch(NUM_WORK);
- queueWorker = new QueueWorkerWorkSerializer();
workProduced = new AtomicInteger(0);
+ workConsumedLatch = new CountDownLatch(NUM_WORK);
+
+ threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS);
+ queueWorker = new ExecutorWorkSerializer(threadPool);
+ queueWorker.activate();
}
@Override
public void tearDown()
{
- threadPool.shutdown();
queueWorker.dispose();
+ threadPool.shutdown();
}
/**
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
new file mode 100644
index 0000000000..8ce34d151e
--- /dev/null
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.tests;
+
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A test for {@link ThreadPool}.
+ *
+ * @author Eike Stepper
+ */
+public class ThreadPoolTest extends AbstractOMTest
+{
+ public void testThreadPool() throws Exception
+ {
+ final ThreadPool pool = ThreadPool.create("test", 100, 200, 60);
+
+ try
+ {
+ final int tasks = pool.getMaximumPoolSize() + 100;
+ final CountDownLatch latch = new CountDownLatch(tasks);
+
+ for (int i = 0; i < tasks; i++)
+ {
+ final int n = i;
+ msg("scheduling " + n);
+ pool.submit(new Runnable()
+ {
+ public void run()
+ {
+ msg("started " + n + " (wc=" + pool.getPoolSize() + ")");
+ ConcurrencyUtil.sleep(1000);
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ msg("FINISHED");
+ }
+ finally
+ {
+ pool.shutdownNow();
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF
index 5308210d6e..03926bce84 100644
--- a/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF
@@ -2,14 +2,14 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: %pluginName
Bundle-SymbolicName: org.eclipse.net4j.util.defs;singleton:=true
-Bundle-Version: 4.1.200.qualifier
+Bundle-Version: 4.1.300.qualifier
Bundle-ClassPath: .
Bundle-Vendor: %providerName
Bundle-Localization: plugin
Bundle-RequiredExecutionEnvironment: J2SE-1.5
-Export-Package: org.eclipse.net4j.util.defs;version="4.1.200",
- org.eclipse.net4j.util.defs.impl;version="4.1.200",
- org.eclipse.net4j.util.defs.util;version="4.1.200"
+Export-Package: org.eclipse.net4j.util.defs;version="4.1.300",
+ org.eclipse.net4j.util.defs.impl;version="4.1.300",
+ org.eclipse.net4j.util.defs.util;version="4.1.300"
Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";visibility:=reexport,
org.eclipse.emf.ecore;bundle-version="[2.5.0,3.0.0)";visibility:=reexport,
org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport
diff --git a/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java b/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java
index 777da0dc40..1e797958ea 100644
--- a/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java
+++ b/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java
@@ -11,14 +11,13 @@
*/
package org.eclipse.net4j.util.defs.impl;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.defs.Net4jUtilDefsPackage;
import org.eclipse.net4j.util.defs.ThreadPoolDef;
import org.eclipse.emf.ecore.EClass;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
/**
* <!-- begin-user-doc --> An implementation of the model object '<em><b>Thread Pool Def</b></em>'. <!-- end-user-doc
@@ -30,9 +29,6 @@ import java.util.concurrent.ThreadFactory;
*/
public class ThreadPoolDefImpl extends ExecutorServiceDefImpl implements ThreadPoolDef
{
-
- private static final String THREADGROUP_IDENTIFIER = "net4j"; //$NON-NLS-1$
-
/**
* <!-- begin-user-doc --> <!-- end-user-doc -->
* @generated
@@ -61,26 +57,8 @@ public class ThreadPoolDefImpl extends ExecutorServiceDefImpl implements ThreadP
@Override
protected Object createInstance()
{
- ExecutorService executorService = Executors.newCachedThreadPool(new DaemonThreadFactory(THREADGROUP_IDENTIFIER));
+ ExecutorService executorService = ThreadPool.create();
return executorService;
}
- private static class DaemonThreadFactory implements ThreadFactory
- {
- private ThreadGroup threadGroup;
-
- public DaemonThreadFactory(String threadGroupIdentifier)
- {
- super();
- threadGroup = new ThreadGroup(threadGroupIdentifier);
- }
-
- public Thread newThread(Runnable r)
- {
- Thread thread = new Thread(threadGroup, r);
- thread.setDaemon(true);
- return thread;
- }
- }
-
} // ThreadPoolDefImpl
diff --git a/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF
index e85dcff502..a1071bf9f4 100644
--- a/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.net4j.util;singleton:=true
-Bundle-Version: 3.5.0.qualifier
+Bundle-Version: 3.6.0.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -15,34 +15,34 @@ Import-Package: org.eclipse.osgi.service.debug;version="[1.0.0,2.0.0)";resolutio
org.osgi.framework;version="[1.3.0,2.0.0)";resolution:=optional,
org.osgi.service.log;version="[1.3.0,2.0.0)";resolution:=optional,
org.osgi.util.tracker;version="[1.3.0,2.0.0)";resolution:=optional
-Export-Package: org.eclipse.net4j.internal.util.bundle;version="3.5.0";x-friends:="org.eclipse.net4j.util.ui,org.eclipse.net4j.tests",
- org.eclipse.net4j.internal.util.container;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs",
- org.eclipse.net4j.internal.util.factory;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs",
- org.eclipse.net4j.internal.util.om;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs",
- org.eclipse.net4j.internal.util.om.pref;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs",
- org.eclipse.net4j.internal.util.table;version="3.5.0";x-internal:=true,
- org.eclipse.net4j.internal.util.test;version="3.5.0";x-friends:="org.eclipse.net4j.tests",
- org.eclipse.net4j.util;version="3.5.0",
- org.eclipse.net4j.util.cache;version="3.5.0",
- org.eclipse.net4j.util.collection;version="3.5.0",
- org.eclipse.net4j.util.concurrent;version="3.5.0",
- org.eclipse.net4j.util.confirmation;version="3.5.0",
- org.eclipse.net4j.util.container;version="3.5.0",
- org.eclipse.net4j.util.container.delegate;version="3.5.0",
- org.eclipse.net4j.util.event;version="3.5.0",
- org.eclipse.net4j.util.factory;version="3.5.0",
- org.eclipse.net4j.util.fsm;version="3.5.0",
- org.eclipse.net4j.util.io;version="3.5.0",
- org.eclipse.net4j.util.lifecycle;version="3.5.0",
- org.eclipse.net4j.util.om;version="3.5.0",
- org.eclipse.net4j.util.om.log;version="3.5.0",
- org.eclipse.net4j.util.om.monitor;version="3.5.0",
- org.eclipse.net4j.util.om.pref;version="3.5.0",
- org.eclipse.net4j.util.om.trace;version="3.5.0",
- org.eclipse.net4j.util.options;version="3.5.0",
- org.eclipse.net4j.util.properties;version="3.5.0",
- org.eclipse.net4j.util.ref;version="3.5.0",
- org.eclipse.net4j.util.registry;version="3.5.0",
- org.eclipse.net4j.util.security;version="3.5.0",
- org.eclipse.net4j.util.transaction;version="3.5.0"
+Export-Package: org.eclipse.net4j.internal.util.bundle;version="3.6.0";x-friends:="org.eclipse.net4j.util.ui,org.eclipse.net4j.tests",
+ org.eclipse.net4j.internal.util.container;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs",
+ org.eclipse.net4j.internal.util.factory;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs",
+ org.eclipse.net4j.internal.util.om;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs",
+ org.eclipse.net4j.internal.util.om.pref;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs",
+ org.eclipse.net4j.internal.util.table;version="3.6.0";x-internal:=true,
+ org.eclipse.net4j.internal.util.test;version="3.6.0";x-friends:="org.eclipse.net4j.tests",
+ org.eclipse.net4j.util;version="3.6.0",
+ org.eclipse.net4j.util.cache;version="3.6.0",
+ org.eclipse.net4j.util.collection;version="3.6.0",
+ org.eclipse.net4j.util.concurrent;version="3.6.0",
+ org.eclipse.net4j.util.confirmation;version="3.6.0",
+ org.eclipse.net4j.util.container;version="3.6.0",
+ org.eclipse.net4j.util.container.delegate;version="3.6.0",
+ org.eclipse.net4j.util.event;version="3.6.0",
+ org.eclipse.net4j.util.factory;version="3.6.0",
+ org.eclipse.net4j.util.fsm;version="3.6.0",
+ org.eclipse.net4j.util.io;version="3.6.0",
+ org.eclipse.net4j.util.lifecycle;version="3.6.0",
+ org.eclipse.net4j.util.om;version="3.6.0",
+ org.eclipse.net4j.util.om.log;version="3.6.0",
+ org.eclipse.net4j.util.om.monitor;version="3.6.0",
+ org.eclipse.net4j.util.om.pref;version="3.6.0",
+ org.eclipse.net4j.util.om.trace;version="3.6.0",
+ org.eclipse.net4j.util.options;version="3.6.0",
+ org.eclipse.net4j.util.properties;version="3.6.0",
+ org.eclipse.net4j.util.ref;version="3.6.0",
+ org.eclipse.net4j.util.registry;version="3.6.0",
+ org.eclipse.net4j.util.security;version="3.6.0",
+ org.eclipse.net4j.util.transaction;version="3.6.0"
Eclipse-BuddyPolicy: registered
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java
index 887dc5693a..80133bffe3 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java
@@ -19,8 +19,10 @@ import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
+ * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}.
*/
-public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable
+@Deprecated
+public class AsynchronousWorkSerializer implements IWorkSerializer, IExecutorServiceProvider, Runnable
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONCURRENCY, AsynchronousWorkSerializer.class);
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java
index 9b4dc04680..69534b6a06 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java
@@ -18,7 +18,9 @@ import java.util.concurrent.Future;
/**
* @author Eike Stepper
+ * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}.
*/
+@Deprecated
public class CompletionWorkSerializer implements IWorkSerializer
{
private CompletionService<Object> completionService;
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java
index 303cf02b29..5a9905e821 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java
@@ -54,4 +54,36 @@ public final class ConcurrencyUtil
{
return ExecutorServiceFactory.get(container);
}
+
+ /**
+ * @since 3.6
+ */
+ public static ExecutorService getExecutorService(Object object)
+ {
+ if (object instanceof IExecutorServiceProvider)
+ {
+ try
+ {
+ return ((IExecutorServiceProvider)object).getExecutorService();
+ }
+ catch (Exception ex)
+ {
+ //$FALL-THROUGH$
+ }
+ }
+
+ if (object instanceof IManagedContainer)
+ {
+ try
+ {
+ return getExecutorService((IManagedContainer)object);
+ }
+ catch (Exception ex)
+ {
+ //$FALL-THROUGH$
+ }
+ }
+
+ return null;
+ }
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java
new file mode 100644
index 0000000000..cbd4806086
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2004-2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author Eike Stepper
+ * @since 3.6
+ */
+public class DelegatingExecutorService implements ExecutorService
+{
+ private final ExecutorService delegate;
+
+ public DelegatingExecutorService(ExecutorService delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public void execute(Runnable command)
+ {
+ delegate.execute(command);
+ }
+
+ public void shutdown()
+ {
+ // Do nothing.
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ return Collections.emptyList();
+ }
+
+ public boolean isShutdown()
+ {
+ return delegate.isShutdown();
+ }
+
+ public boolean isTerminated()
+ {
+ return delegate.isTerminated();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ return delegate.submit(task);
+ }
+
+ public <T> Future<T> submit(Runnable task, T result)
+ {
+ return delegate.submit(task, result);
+ }
+
+ public Future<?> submit(Runnable task)
+ {
+ return delegate.submit(task);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
+ {
+ return delegate.invokeAll(tasks);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ return delegate.invokeAny(tasks);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java
index 965deb6b6b..681d60dc0c 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java
@@ -20,8 +20,6 @@ import org.eclipse.net4j.util.lifecycle.LifecycleState;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
/**
* @author Eike Stepper
@@ -40,87 +38,70 @@ public class ExecutorServiceFactory extends Factory
super(PRODUCT_GROUP, TYPE);
}
- public ExecutorService create(String threadGroupName)
+ public ExecutorService create(String description)
{
- if (threadGroupName == null)
- {
- threadGroupName = DEFAULT_THREAD_GROUP_NAME;
- }
-
- final ThreadGroup threadGroup = new ThreadGroup(threadGroupName);
- ThreadFactory threadFactory = new ThreadFactory()
- {
- private int num;
-
- public Thread newThread(Runnable r)
- {
- Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-Thread-" + ++num);
- thread.setDaemon(true);
- return thread;
- }
- };
-
- final ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
+ final ExecutorService executorService = ThreadPool.create(description);
+
return LifecycleUtil.delegateLifecycle(getClass().getClassLoader(), executorService, ExecutorService.class,
new ILifecycle()
- {
- private boolean active;
-
- public void activate() throws LifecycleException
- {
- active = true;
- }
-
- public Exception deactivate()
- {
- try
- {
- executorService.shutdown();
- active = false;
- return null;
- }
- catch (Exception ex)
{
- return ex;
- }
- }
-
- public LifecycleState getLifecycleState()
- {
- return active ? LifecycleState.ACTIVE : LifecycleState.INACTIVE;
- }
-
- public boolean isActive()
- {
- return active;
- }
-
- public void addListener(IListener listener)
- {
- // Do nothing
- }
-
- public void removeListener(IListener listener)
- {
- // Do nothing
- }
-
- public IListener[] getListeners()
- {
- return EventUtil.NO_LISTENERS;
- }
-
- public boolean hasListeners()
- {
- return false;
- }
-
- @Override
- public String toString()
- {
- return "CachedThreadPool";
- }
- });
+ private boolean active;
+
+ public void activate() throws LifecycleException
+ {
+ active = true;
+ }
+
+ public Exception deactivate()
+ {
+ try
+ {
+ executorService.shutdown();
+ active = false;
+ return null;
+ }
+ catch (Exception ex)
+ {
+ return ex;
+ }
+ }
+
+ public LifecycleState getLifecycleState()
+ {
+ return active ? LifecycleState.ACTIVE : LifecycleState.INACTIVE;
+ }
+
+ public boolean isActive()
+ {
+ return active;
+ }
+
+ public void addListener(IListener listener)
+ {
+ // Do nothing
+ }
+
+ public void removeListener(IListener listener)
+ {
+ // Do nothing
+ }
+
+ public IListener[] getListeners()
+ {
+ return EventUtil.NO_LISTENERS;
+ }
+
+ public boolean hasListeners()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CachedThreadPool";
+ }
+ });
}
public static ExecutorService get(IManagedContainer container)
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java
new file mode 100644
index 0000000000..621ec5d51f
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.concurrent;
+
+import org.eclipse.net4j.util.lifecycle.Lifecycle;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.log.OMLogger;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
+/**
+ * @author Eike Stepper
+ * @since 3.6
+ */
+public class ExecutorWorkSerializer extends Lifecycle implements IWorkSerializer
+{
+ private final Queue<Runnable> workQueue = new LinkedList<Runnable>();
+
+ private Executor executor;
+
+ private volatile boolean working;
+
+ private volatile boolean disposed;
+
+ public ExecutorWorkSerializer()
+ {
+ }
+
+ public ExecutorWorkSerializer(Executor executor)
+ {
+ this.executor = executor;
+ }
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
+ public void setExecutor(Executor executor)
+ {
+ checkInactive();
+ this.executor = executor;
+ }
+
+ public synchronized boolean addWork(Runnable runnable)
+ {
+ if (disposed)
+ {
+ return false;
+ }
+
+ if (!working && isActive())
+ {
+ startWork(runnable);
+ }
+ else
+ {
+ workQueue.add(runnable);
+ }
+
+ return true;
+ }
+
+ public synchronized void dispose()
+ {
+ LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
+ }
+
+ @Override
+ public String toString()
+ {
+ return ExecutorWorkSerializer.class.getSimpleName();
+ }
+
+ protected void handleException(Runnable runnable, Throwable ex)
+ {
+ }
+
+ protected void noWork()
+ {
+ }
+
+ private void startWork(final Runnable runnable)
+ {
+ working = true;
+ if (!disposed)
+ {
+ executor.execute(new RunnableWithName()
+ {
+ public String getName()
+ {
+ if (runnable instanceof RunnableWithName)
+ {
+ return ((RunnableWithName)runnable).getName();
+ }
+
+ return null;
+ }
+
+ public void run()
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable ex)
+ {
+ try
+ {
+ handleException(runnable, ex);
+ }
+ catch (Throwable ignore)
+ {
+ //$FALL-THROUGH$
+ }
+ }
+
+ workDone();
+ }
+ });
+ }
+ }
+
+ private synchronized void workDone()
+ {
+ Runnable runnable = workQueue.poll();
+ if (runnable != null)
+ {
+ startWork(runnable);
+ }
+ else
+ {
+ noWork();
+ working = false;
+ }
+ }
+
+ @Override
+ protected void doBeforeActivate() throws Exception
+ {
+ super.doBeforeActivate();
+ checkState(executor, "executor");
+ }
+
+ @Override
+ protected void doActivate() throws Exception
+ {
+ super.doActivate();
+ workDone();
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ disposed = true;
+ working = false;
+ workQueue.clear();
+
+ super.doDeactivate();
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java
new file mode 100644
index 0000000000..becd91b6c0
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author Eike Stepper
+ * @since 3.6
+ */
+public interface IExecutorServiceProvider
+{
+ public ExecutorService getExecutorService();
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java
index 2baf087a8b..03163efaf2 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java
@@ -15,7 +15,9 @@ import org.eclipse.net4j.util.om.log.OMLogger;
/**
* @author Eike Stepper
+ * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}.
*/
+@Deprecated
public class QueueWorkerWorkSerializer extends QueueRunner implements IWorkSerializer
{
public QueueWorkerWorkSerializer()
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java
new file mode 100644
index 0000000000..048de5aa4e
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.concurrent;
+
+/**
+ * @author Eike Stepper
+ * @since 3.6
+ */
+public interface RunnableWithName extends Runnable
+{
+ public String getName();
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java
new file mode 100644
index 0000000000..b172dc0598
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright (c) 2004-2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.concurrent;
+
+import org.eclipse.net4j.util.StringUtil;
+
+import java.lang.reflect.Method;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Eike Stepper
+ * @since 3.6
+ */
+public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionHandler
+{
+ public static final String DEFAULT_THREAD_GROUP_NAME = ExecutorServiceFactory.DEFAULT_THREAD_GROUP_NAME;
+
+ public static final int DEFAULT_CORE_POOL_SIZE = 10;
+
+ public static final int DEFAULT_MAXIMUM_POOL_SIZE = 100;
+
+ public static final long DEFAULT_KEEP_ALIVE_SECONDS = 60;
+
+ private static final Class<?> LINKED_BLOCKING_DEQUE_CLASS;
+
+ private static final Method ADD_FIRST_METHOD;
+
+ private final Executor defaultExecutor = new Executor()
+ {
+ public void execute(Runnable runnable)
+ {
+ ThreadPool.super.execute(runnable);
+ }
+ };
+
+ private final Executor namingExecutor = new Executor()
+ {
+ public void execute(Runnable runnable)
+ {
+ if (runnable instanceof RunnableWithName)
+ {
+ String name = ((RunnableWithName)runnable).getName();
+ if (name != null)
+ {
+ Thread thread = new Thread(runnable, name);
+ thread.setDaemon(true);
+ thread.start();
+ return;
+ }
+ }
+
+ ThreadPool.super.execute(runnable);
+ }
+ };
+
+ private volatile Executor executor = defaultExecutor;
+
+ public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, ThreadFactory threadFactory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, createWorkQueue(), threadFactory);
+ ((ThreadPool.WorkQueue)getQueue()).setThreadPool(this);
+ setRejectedExecutionHandler(this);
+ }
+
+ public final void setNaming(boolean naming)
+ {
+ executor = naming ? namingExecutor : defaultExecutor;
+ }
+
+ @Override
+ public void execute(final Runnable runnable)
+ {
+ executor.execute(runnable);
+ }
+
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
+ {
+ ((ThreadPool.WorkQueue)getQueue()).addFirst(runnable);
+ }
+
+ public static ThreadPool create()
+ {
+ return create(null, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_SECONDS);
+ }
+
+ public static ThreadPool create(String description)
+ {
+ String threadGroupName = null;
+ int corePoolSize = DEFAULT_CORE_POOL_SIZE;
+ int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+ long keepAliveSeconds = DEFAULT_KEEP_ALIVE_SECONDS;
+
+ if (!StringUtil.isEmpty(description))
+ {
+ String[] tokens = description.split(":");
+ if (tokens.length > 0)
+ {
+ threadGroupName = tokens[0];
+ if (tokens.length > 1)
+ {
+ try
+ {
+ corePoolSize = Integer.parseInt(tokens[1]);
+ }
+ catch (NumberFormatException ex)
+ {
+ //$FALL-THROUGH$
+ }
+
+ if (tokens.length > 2)
+ {
+ try
+ {
+ maximumPoolSize = Integer.parseInt(tokens[2]);
+ }
+ catch (NumberFormatException ex)
+ {
+ //$FALL-THROUGH$
+ }
+
+ if (tokens.length > 3)
+ {
+ try
+ {
+ keepAliveSeconds = Long.parseLong(tokens[3]);
+ }
+ catch (NumberFormatException ex)
+ {
+ //$FALL-THROUGH$
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return create(threadGroupName, corePoolSize, maximumPoolSize, keepAliveSeconds);
+ }
+
+ public static ThreadPool create(String threadGroupName, int corePoolSize, int maximumPoolSize, long keepAliveSeconds)
+ {
+ ThreadFactory threadFactory = createThreadFactory(threadGroupName);
+ return new ThreadPool(corePoolSize, maximumPoolSize, keepAliveSeconds, threadFactory);
+ }
+
+ private static ThreadFactory createThreadFactory(String threadGroupName)
+ {
+ if (threadGroupName == null)
+ {
+ threadGroupName = DEFAULT_THREAD_GROUP_NAME;
+ }
+
+ final ThreadGroup threadGroup = new ThreadGroup(threadGroupName);
+
+ ThreadFactory threadFactory = new ThreadFactory()
+ {
+ private final AtomicInteger num = new AtomicInteger();
+
+ public Thread newThread(Runnable r)
+ {
+ Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-thread-" + num.incrementAndGet());
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+
+ return threadFactory;
+ }
+
+ private static ThreadPool.WorkQueue createWorkQueue()
+ {
+ if (LINKED_BLOCKING_DEQUE_CLASS != null)
+ {
+ try
+ {
+ return new WorkQueueJRE16();
+ }
+ catch (Throwable ex)
+ {
+ //$FALL-THROUGH$
+ }
+ }
+
+ return new WorkQueueJRE15();
+ }
+
+ static
+ {
+ Class<?> c = null;
+ Method m = null;
+
+ try
+ {
+ c = Class.forName("java.util.concurrent.LinkedBlockingDeque");
+ m = c.getMethod("addFirst", Object.class);
+ }
+ catch (Throwable ex)
+ {
+ c = null;
+ m = null;
+ }
+
+ LINKED_BLOCKING_DEQUE_CLASS = c;
+ ADD_FIRST_METHOD = m;
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private interface WorkQueue extends BlockingQueue<Runnable>
+ {
+ public void setThreadPool(ThreadPool threadPool);
+
+ public void addFirst(Runnable runnable);
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static final class WorkQueueJRE15 extends LinkedBlockingQueue<Runnable>implements ThreadPool.WorkQueue
+ {
+ private static final long serialVersionUID = 1L;
+
+ private ThreadPool threadPool;
+
+ public WorkQueueJRE15()
+ {
+ }
+
+ public void setThreadPool(ThreadPool threadPool)
+ {
+ this.threadPool = threadPool;
+ }
+
+ public void addFirst(Runnable runnable)
+ {
+ super.offer(runnable);
+ }
+
+ @Override
+ public boolean offer(Runnable runnable)
+ {
+ if (threadPool.getPoolSize() < threadPool.getMaximumPoolSize())
+ {
+ return false;
+ }
+
+ return super.offer(runnable);
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static final class WorkQueueJRE16 extends AbstractQueue<Runnable>implements ThreadPool.WorkQueue
+ {
+ private final BlockingQueue<Runnable> delegate = createDelegate();
+
+ private ThreadPool threadPool;
+
+ public WorkQueueJRE16()
+ {
+ }
+
+ public void setThreadPool(ThreadPool threadPool)
+ {
+ this.threadPool = threadPool;
+ }
+
+ public void addFirst(Runnable runnable)
+ {
+ try
+ {
+ ADD_FIRST_METHOD.invoke(delegate, runnable);
+ }
+ catch (Throwable ex)
+ {
+ //$FALL-THROUGH$
+ }
+
+ delegate.offer(runnable);
+ }
+
+ public boolean offer(Runnable r)
+ {
+ if (threadPool.getPoolSize() < threadPool.getMaximumPoolSize())
+ {
+ return false;
+ }
+
+ return delegate.offer(r);
+ }
+
+ @Override
+ public int size()
+ {
+ return delegate.size();
+ }
+
+ public Runnable poll()
+ {
+ return delegate.poll();
+ }
+
+ @Override
+ public Iterator<Runnable> iterator()
+ {
+ return delegate.iterator();
+ }
+
+ public Runnable peek()
+ {
+ return delegate.peek();
+ }
+
+ public void put(Runnable e) throws InterruptedException
+ {
+ delegate.put(e);
+ }
+
+ public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return delegate.offer(e, timeout, unit);
+ }
+
+ public Runnable take() throws InterruptedException
+ {
+ return delegate.take();
+ }
+
+ public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return delegate.poll(timeout, unit);
+ }
+
+ public int remainingCapacity()
+ {
+ return delegate.remainingCapacity();
+ }
+
+ public int drainTo(Collection<? super Runnable> c)
+ {
+ return delegate.drainTo(c);
+ }
+
+ public int drainTo(Collection<? super Runnable> c, int maxElements)
+ {
+ return delegate.drainTo(c, maxElements);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static BlockingQueue<Runnable> createDelegate()
+ {
+ try
+ {
+ return (BlockingQueue<Runnable>)LINKED_BLOCKING_DEQUE_CLASS.newInstance();
+ }
+ catch (Throwable ex)
+ {
+ //$FALL-THROUGH$
+ }
+
+ return new LinkedBlockingQueue<Runnable>();
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java
index 120b3a00d1..d51e68da91 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java
@@ -10,6 +10,8 @@
*/
package org.eclipse.net4j.util.event;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
+
import java.util.concurrent.ExecutorService;
/**
@@ -20,7 +22,7 @@ import java.util.concurrent.ExecutorService;
* @apiviz.exclude
*/
@Deprecated
-public class ExecutorServiceNotifier extends Notifier
+public class ExecutorServiceNotifier extends Notifier implements IExecutorServiceProvider
{
private ExecutorService notificationExecutorService;
@@ -28,6 +30,14 @@ public class ExecutorServiceNotifier extends Notifier
{
}
+ /**
+ * @since 3.6
+ */
+ public ExecutorService getExecutorService()
+ {
+ return notificationExecutorService;
+ }
+
@Override
public ExecutorService getNotificationService()
{
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java
index 3a93c37867..20f8d6a459 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java
@@ -12,13 +12,14 @@ package org.eclipse.net4j.util.io;
import org.eclipse.net4j.util.io.ExtendedIOUtil.ClassResolver;
+import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException;
/**
* @author Eike Stepper
*/
-public class DataInputExtender implements ExtendedDataInput
+public class DataInputExtender implements ExtendedDataInput, Closeable
{
private DataInput input;
@@ -142,4 +143,15 @@ public class DataInputExtender implements ExtendedDataInput
{
return input.skipBytes(n);
}
+
+ /**
+ * @since 3.6
+ */
+ public void close() throws IOException
+ {
+ if (input instanceof Closeable)
+ {
+ ((Closeable)input).close();
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java
index c93eab8b8e..d6192e9e72 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java
@@ -10,13 +10,14 @@
*/
package org.eclipse.net4j.util.io;
+import java.io.Closeable;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author Eike Stepper
*/
-public class DataOutputExtender implements ExtendedDataOutput
+public class DataOutputExtender implements ExtendedDataOutput, Closeable
{
private DataOutput output;
@@ -125,4 +126,15 @@ public class DataOutputExtender implements ExtendedDataOutput
{
ExtendedIOUtil.writeException(output, t);
}
+
+ /**
+ * @since 3.6
+ */
+ public void close() throws IOException
+ {
+ if (output instanceof Closeable)
+ {
+ ((Closeable)output).close();
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java
index f52c4e2983..01fbf088b7 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java
@@ -12,6 +12,7 @@ package org.eclipse.net4j.util.io;
import org.eclipse.net4j.util.io.ExtendedIOUtil.ClassResolver;
+import java.io.Closeable;
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
@@ -48,7 +49,7 @@ public interface ExtendedDataInput extends DataInput
* @author Eike Stepper
* @since 2.0
*/
- public static class Delegating implements ExtendedDataInput
+ public static class Delegating implements ExtendedDataInput, Closeable
{
private ExtendedDataInput delegate;
@@ -177,6 +178,17 @@ public interface ExtendedDataInput extends DataInput
{
return delegate.skipBytes(n);
}
+
+ /**
+ * @since 3.6
+ */
+ public void close() throws IOException
+ {
+ if (delegate instanceof Closeable)
+ {
+ ((Closeable)delegate).close();
+ }
+ }
}
/**
@@ -209,5 +221,16 @@ public interface ExtendedDataInput extends DataInput
return -1;
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ if (delegate instanceof Closeable)
+ {
+ ((Closeable)delegate).close();
+ }
+
+ super.close();
+ }
}
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java
index f3d47ce989..74d86e83e1 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java
@@ -10,6 +10,7 @@
*/
package org.eclipse.net4j.util.io;
+import java.io.Closeable;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
@@ -41,7 +42,7 @@ public interface ExtendedDataOutput extends DataOutput
* @author Eike Stepper
* @since 2.0
*/
- public static class Delegating implements ExtendedDataOutput
+ public static class Delegating implements ExtendedDataOutput, Closeable
{
private ExtendedDataOutput delegate;
@@ -155,6 +156,17 @@ public interface ExtendedDataOutput extends DataOutput
{
delegate.writeUTF(str);
}
+
+ /**
+ * @since 3.6
+ */
+ public void close() throws IOException
+ {
+ if (delegate instanceof Closeable)
+ {
+ ((Closeable)delegate).close();
+ }
+ }
}
/**
@@ -180,5 +192,16 @@ public interface ExtendedDataOutput extends DataOutput
{
delegate.write(b);
}
+
+ @Override
+ public void close() throws IOException
+ {
+ if (delegate instanceof Closeable)
+ {
+ ((Closeable)delegate).close();
+ }
+
+ super.close();
+ }
}
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java
new file mode 100644
index 0000000000..4991d90692
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2011-2013, 2015 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ * Simon McDuff - bug 201266
+ * Simon McDuff - bug 230832
+ */
+package org.eclipse.net4j.util.ref;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Eike Stepper
+ * @since 3.6
+ */
+public abstract class CleanableReferenceQueue<T> extends ReferenceQueue<T>
+{
+ public static final int ALL_WORK_PER_POLL = ReferenceQueueWorker.ALL_WORK_PER_POLL;
+
+ public static final int DEFAULT_MAX_WORK_PER_POLL = ReferenceQueueWorker.DEFAULT_MAX_WORK_PER_POLL;
+
+ public static final int DEFAULT_POLL_MILLIS = ReferenceQueueWorker.DEFAULT_POLL_MILLIS;
+
+ private final AtomicBoolean cleaning = new AtomicBoolean();
+
+ private int maxWorkPerPoll;
+
+ private long pollMillis;
+
+ private long lastPoll = System.currentTimeMillis();
+
+ public CleanableReferenceQueue()
+ {
+ setPollMillis(DEFAULT_POLL_MILLIS);
+ setMaxWorkPerPoll(DEFAULT_MAX_WORK_PER_POLL);
+ }
+
+ public final long getPollMillis()
+ {
+ return pollMillis;
+ }
+
+ public final void setPollMillis(long pollMillis)
+ {
+ this.pollMillis = pollMillis;
+ }
+
+ public final int getMaxWorkPerPoll()
+ {
+ return maxWorkPerPoll;
+ }
+
+ public final void setMaxWorkPerPoll(int maxWorkPerPoll)
+ {
+ this.maxWorkPerPoll = maxWorkPerPoll;
+ }
+
+ public final void register(T object)
+ {
+ clean();
+ createReference(object);
+ }
+
+ public final void clean()
+ {
+ if (cleaning.compareAndSet(false, true))
+ {
+ long now = System.currentTimeMillis();
+ if (lastPoll + pollMillis > now)
+ {
+ int count = maxWorkPerPoll;
+ if (count == ALL_WORK_PER_POLL)
+ {
+ count = Integer.MAX_VALUE;
+ }
+
+ for (int i = 0; i < count; i++)
+ {
+ Reference<? extends T> reference = poll();
+ if (reference == null)
+ {
+ break;
+ }
+
+ cleanReference(reference);
+ }
+
+ lastPoll = now;
+ }
+
+ cleaning.set(false);
+ }
+ }
+
+ protected abstract void cleanReference(Reference<? extends T> reference);
+
+ protected abstract Reference<T> createReference(T object);
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java
index 0fc0d05f0f..2598c12aec 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java
@@ -25,17 +25,17 @@ public abstract class ReferenceQueueWorker<T> extends Worker
/**
* @since 3.0
*/
- public static final int DEFAULT_POLL_MILLIS = 1000 * 60; // One minute
+ public static final int ALL_WORK_PER_POLL = -1;
/**
* @since 3.0
*/
- public static final int ALL_WORK_PER_POLL = -1;
+ public static final int DEFAULT_MAX_WORK_PER_POLL = ALL_WORK_PER_POLL;
/**
* @since 3.0
*/
- public static final int DEFAULT_MAX_WORK_PER_POLL = ALL_WORK_PER_POLL;
+ public static final int DEFAULT_POLL_MILLIS = 1000 * 60; // One minute
private ReferenceQueue<T> queue = new ReferenceQueue<T>();
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
index 5cdff1a0c1..67e3c0df64 100644
--- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.net4j;singleton:=true
-Bundle-Version: 4.4.100.qualifier
+Bundle-Version: 4.5.0.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -11,7 +11,7 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5
Bundle-ClassPath: .
Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional,
org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport
-Export-Package: org.eclipse.internal.net4j;version="4.4.100";
+Export-Package: org.eclipse.internal.net4j;version="4.5.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -20,7 +20,7 @@ Export-Package: org.eclipse.internal.net4j;version="4.4.100";
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests,
org.eclipse.net4j.defs",
- org.eclipse.internal.net4j.buffer;version="4.4.100";
+ org.eclipse.internal.net4j.buffer;version="4.5.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -29,17 +29,17 @@ Export-Package: org.eclipse.internal.net4j;version="4.4.100";
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests,
org.eclipse.net4j.defs",
- org.eclipse.internal.net4j.bundle;version="4.4.100";x-internal:=true,
- org.eclipse.net4j;version="4.4.100",
- org.eclipse.net4j.acceptor;version="4.4.100",
- org.eclipse.net4j.buffer;version="4.4.100",
- org.eclipse.net4j.channel;version="4.4.100",
- org.eclipse.net4j.connector;version="4.4.100",
- org.eclipse.net4j.protocol;version="4.4.100",
- org.eclipse.net4j.signal;version="4.4.100",
- org.eclipse.net4j.signal.confirmation;version="4.4.100",
- org.eclipse.net4j.signal.heartbeat;version="4.4.100",
- org.eclipse.net4j.signal.security;version="4.4.100",
- org.eclipse.net4j.signal.wrapping;version="4.4.100",
- org.eclipse.spi.net4j;version="4.4.100"
+ org.eclipse.internal.net4j.bundle;version="4.5.0";x-internal:=true,
+ org.eclipse.net4j;version="4.5.0",
+ org.eclipse.net4j.acceptor;version="4.5.0",
+ org.eclipse.net4j.buffer;version="4.5.0",
+ org.eclipse.net4j.channel;version="4.5.0",
+ org.eclipse.net4j.connector;version="4.5.0",
+ org.eclipse.net4j.protocol;version="4.5.0",
+ org.eclipse.net4j.signal;version="4.5.0",
+ org.eclipse.net4j.signal.confirmation;version="4.5.0",
+ org.eclipse.net4j.signal.heartbeat;version="4.5.0",
+ org.eclipse.net4j.signal.security;version="4.5.0",
+ org.eclipse.net4j.signal.wrapping;version="4.5.0",
+ org.eclipse.spi.net4j;version="4.5.0"
Eclipse-BuddyPolicy: registered
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java
index ec1ce20bb4..9f765705e6 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.ITransportConfig;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.protocol.IProtocolProvider;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.security.INegotiator;
@@ -24,7 +25,7 @@ import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
*/
-public class TransportConfig implements ITransportConfig
+public class TransportConfig implements ITransportConfig, IExecutorServiceProvider
{
private transient ILifecycle lifecycle;
@@ -70,6 +71,11 @@ public class TransportConfig implements ITransportConfig
this.lifecycle = lifecycle;
}
+ public ExecutorService getExecutorService()
+ {
+ return receiveExecutor;
+ }
+
public ExecutorService getReceiveExecutor()
{
return receiveExecutor;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java
index ff6a8f187c..f1efa170e3 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java
@@ -15,6 +15,7 @@ import org.eclipse.net4j.buffer.IBufferPool;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.ref.CleanableReferenceQueue;
import org.eclipse.internal.net4j.bundle.OM;
@@ -42,10 +43,27 @@ public class BufferPool extends BufferProvider implements IBufferPool.Introspect
private final Queue<BufferRef> buffers = new ConcurrentLinkedQueue<BufferRef>();
@ExcludeFromDump
- private final ReferenceQueue<IBuffer> referenceQueue = new ReferenceQueue<IBuffer>();
+ private final CleanableReferenceQueue<IBuffer> referenceQueue = new CleanableReferenceQueue<IBuffer>()
+ {
+ @Override
+ protected Reference<IBuffer> createReference(IBuffer buffer)
+ {
+ return new BufferRef(buffer, this);
+ }
- @ExcludeFromDump
- private Monitor monitor;
+ @Override
+ protected void cleanReference(Reference<? extends IBuffer> reference)
+ {
+ if (buffers.remove(reference))
+ {
+ --pooledBuffers;
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Collected buffer"); //$NON-NLS-1$
+ }
+ }
+ }
+ };
public BufferPool(IBufferProvider provider)
{
@@ -169,22 +187,6 @@ public class BufferPool extends BufferProvider implements IBufferPool.Introspect
++pooledBuffers;
}
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
- monitor = new Monitor();
- monitor.start();
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- monitor.interrupt();
- monitor = null;
- super.doDeactivate();
- }
-
private static final class BufferRef extends SoftReference<IBuffer>
{
public BufferRef(IBuffer buffer, ReferenceQueue<IBuffer> queue)
@@ -192,60 +194,4 @@ public class BufferPool extends BufferProvider implements IBufferPool.Introspect
super(buffer, queue);
}
}
-
- private final class Monitor extends Thread
- {
- public Monitor()
- {
- setName("Net4jBufferPoolMonitor"); //$NON-NLS-1$
- setDaemon(true);
- }
-
- @Override
- public void run()
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Start monitoring"); //$NON-NLS-1$
- }
-
- try
- {
- while (isActive() && !isInterrupted())
- {
- Reference<? extends IBuffer> bufferRef = pollQueue();
- if (bufferRef != null)
- {
- if (buffers.remove(bufferRef))
- {
- --pooledBuffers;
- if (TRACER.isEnabled())
- {
- TRACER.trace("Collected buffer"); //$NON-NLS-1$
- }
- }
- }
- }
- }
- catch (InterruptedException ex)
- {
- return;
- }
- finally
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Stop monitoring"); //$NON-NLS-1$
- }
- }
- }
-
- /**
- * Factored out for better profiling.
- */
- private Reference<? extends IBuffer> pollQueue() throws InterruptedException
- {
- return referenceQueue.remove(5000);
- }
- }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
index e3676f8794..fd1a9a33e6 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
@@ -165,6 +165,11 @@ public class BufferInputStream extends InputStream implements IBufferHandler
buffers = null;
currentBuffer = null;
super.close();
+
+ if (ccam)
+ {
+ closeChannel();
+ }
}
@Override
@@ -219,6 +224,16 @@ public class BufferInputStream extends InputStream implements IBufferHandler
return true;
}
+ /**
+ * Subclasses may override.
+ *
+ * @since 4.5
+ */
+ protected void closeChannel()
+ {
+ // Do nothing.
+ }
+
private long computeTimeout(final long check) throws IOTimeoutException
{
long remaining;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
index 52c99209bc..6fa3eb3cac 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
@@ -16,6 +16,7 @@ import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.io.IOTimeoutException;
+import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -194,6 +195,22 @@ public abstract class Signal implements Runnable
}
/**
+ * @since 4.5
+ */
+ protected boolean closeInputStreamAfterMe()
+ {
+ return true;
+ }
+
+ /**
+ * @since 4.5
+ */
+ protected boolean closeOutputStreamAfterMe()
+ {
+ return true;
+ }
+
+ /**
* @since 2.0
*/
protected InputStream getCurrentInputStream()
@@ -265,6 +282,16 @@ public abstract class Signal implements Runnable
}
finally
{
+ if (closeInputStreamAfterMe())
+ {
+ IOUtil.closeSilent(bufferInputStream);
+ }
+
+ if (closeOutputStreamAfterMe())
+ {
+ IOUtil.closeSilent(bufferOutputStream);
+ }
+
protocol.stopSignal(this, exception);
}
}
@@ -274,11 +301,19 @@ public abstract class Signal implements Runnable
this.correlationID = correlationID;
}
+ /**
+ * Transfers ownership of the passed stream to this signal.
+ * The signal closes the stream when done reading.
+ */
void setBufferInputStream(BufferInputStream inputStream)
{
bufferInputStream = inputStream;
}
+ /**
+ * Transfers ownership of the passed stream to this signal.
+ * The signal closes the stream when done writing.
+ */
void setBufferOutputStream(BufferOutputStream outputStream)
{
bufferOutputStream = outputStream;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index ae28ab0f50..fdfa654432 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -18,7 +18,6 @@ import org.eclipse.net4j.channel.ChannelOutputStream;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.util.WrappedException;
-import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
@@ -31,7 +30,6 @@ import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.internal.net4j.bundle.OM;
-import org.eclipse.spi.net4j.InternalChannel;
import org.eclipse.spi.net4j.Protocol;
import java.io.IOException;
@@ -41,7 +39,6 @@ import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
/**
* The default implementation of a {@link ISignalProtocol signal protocol}.
@@ -635,24 +632,10 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE>
}
@Override
- public int read() throws IOException
+ protected void closeChannel()
{
- if (isCCAM())
- {
- final InternalChannel channel = (InternalChannel)getChannel();
-
- ExecutorService executorService = channel.getReceiveExecutor();
- executorService.submit(new Runnable()
- {
- public void run()
- {
- ConcurrencyUtil.sleep(500);
- channel.close();
- }
- });
- }
-
- return super.read();
+ IChannel channel = getChannel();
+ LifecycleUtil.deactivate(channel);
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
index 451ee5d7d3..10dc6c0680 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
@@ -16,8 +16,10 @@ import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferHandler;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.protocol.IProtocol;
+import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IWorkSerializer;
-import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
+import org.eclipse.net4j.util.concurrent.RunnableWithName;
import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.IListener;
@@ -33,6 +35,7 @@ import org.eclipse.spi.net4j.InternalChannel.SendQueueEvent.Type;
import java.text.MessageFormat;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,7 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author Eike Stepper
* @since 2.0
*/
-public class Channel extends Lifecycle implements InternalChannel
+public class Channel extends Lifecycle implements InternalChannel, IExecutorServiceProvider
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
@@ -117,6 +120,14 @@ public class Channel extends Lifecycle implements InternalChannel
this.id = id;
}
+ /**
+ * @since 4.5
+ */
+ public ExecutorService getExecutorService()
+ {
+ return receiveExecutor;
+ }
+
public ExecutorService getReceiveExecutor()
{
return receiveExecutor;
@@ -219,7 +230,9 @@ public class Channel extends Lifecycle implements InternalChannel
}
++receivedBuffers;
- receiveSerializer.addWork(createReceiverWork(buffer));
+
+ ReceiverWork receiverWork = createReceiverWork(buffer);
+ receiveSerializer.addWork(receiverWork);
}
else
{
@@ -273,13 +286,14 @@ public class Channel extends Lifecycle implements InternalChannel
{
super.doActivate();
sendQueue = new SendQueue();
- if (receiveExecutor == null)
+ if (receiveExecutor != null)
{
- receiveSerializer = new SynchronousWorkSerializer();
+ receiveSerializer = new ReceiveSerializer2(receiveExecutor);
+ LifecycleUtil.activate(receiveSerializer);
}
else
{
- receiveSerializer = new ReceiveSerializer();
+ receiveSerializer = new SynchronousWorkSerializer();
}
}
@@ -322,8 +336,10 @@ public class Channel extends Lifecycle implements InternalChannel
*
* @author Eike Stepper
* @since 4.1
+ * @deprecated As of 4.4 use {@link ExecutorWorkSerializer}.
*/
- protected class ReceiveSerializer extends QueueWorkerWorkSerializer
+ @Deprecated
+ protected class ReceiveSerializer extends org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer
{
@Override
protected String getThreadName()
@@ -345,8 +361,31 @@ public class Channel extends Lifecycle implements InternalChannel
* If the meaning of this type isn't clear, there really should be more of a description here...
*
* @author Eike Stepper
+ * @since 4.4
+ */
+ private class ReceiveSerializer2 extends ExecutorWorkSerializer
+ {
+ public ReceiveSerializer2(Executor executor)
+ {
+ super(executor);
+ }
+
+ @Override
+ protected void noWork()
+ {
+ if (isClosed())
+ {
+ dispose();
+ }
+ }
+ }
+
+ /**
+ * If the meaning of this type isn't clear, there really should be more of a description here...
+ *
+ * @author Eike Stepper
*/
- protected class ReceiverWork implements Runnable
+ protected class ReceiverWork implements RunnableWithName
{
private final IBuffer buffer;
@@ -358,6 +397,14 @@ public class Channel extends Lifecycle implements InternalChannel
this.buffer = buffer;
}
+ /**
+ * @since 4.5
+ */
+ public String getName()
+ {
+ return "Net4jReceiveSerializer-" + Channel.this; //$NON-NLS-1$
+ }
+
public void run()
{
IBufferHandler receiveHandler = getReceiveHandler();
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
index f3e3ece4e3..b570c88f70 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.protocol.IProtocol2;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
@@ -29,7 +30,8 @@ import java.util.concurrent.ExecutorService;
* @author Eike Stepper
* @since 2.0
*/
-public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IProtocol2<INFRA_STRUCTURE>
+public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle
+ implements IProtocol2<INFRA_STRUCTURE>, IExecutorServiceProvider
{
private String type;

Back to the top