diff options
author | Eike Stepper | 2015-07-22 12:25:46 +0000 |
---|---|---|
committer | Eike Stepper | 2015-07-22 12:25:46 +0000 |
commit | 017c0e91d0dc68b7ead1b4c893f032d3d72e4e31 (patch) | |
tree | 43b63e279685bbafc4e131a243609487e5112c99 /plugins | |
parent | c8eba7351ffd2d37e17db3ae3c2a7009d366b769 (diff) | |
download | cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.tar.gz cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.tar.xz cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.zip |
[473277] Enhance ThreadPool and use it as much as possible
https://bugs.eclipse.org/bugs/show_bug.cgi?id=473277
Diffstat (limited to 'plugins')
58 files changed, 1622 insertions, 498 deletions
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java index 7d9f7ba982..8ae662e7db 100644 --- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java +++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/AbstractCDORevisionCache.java @@ -14,6 +14,7 @@ package org.eclipse.emf.cdo.internal.common.revision; import org.eclipse.emf.cdo.common.branch.CDOBranch; import org.eclipse.emf.cdo.common.branch.CDOBranchManager; +import org.eclipse.emf.cdo.common.branch.CDOBranchVersion; import org.eclipse.emf.cdo.common.id.CDOID; import org.eclipse.emf.cdo.common.revision.CDORevision; import org.eclipse.emf.cdo.common.revision.CDORevisionKey; @@ -21,8 +22,11 @@ import org.eclipse.emf.cdo.internal.common.bundle.OM; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; import org.eclipse.net4j.util.event.IListener; +import org.eclipse.net4j.util.lifecycle.Lifecycle; import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.ref.CleanableReferenceQueue; import org.eclipse.net4j.util.ref.ReferenceQueueWorker; import java.lang.ref.Reference; @@ -33,20 +37,36 @@ import java.text.MessageFormat; /** * @author Eike Stepper */ -public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<InternalCDORevision> - implements InternalCDORevisionCache +public abstract class AbstractCDORevisionCache extends Lifecycle implements InternalCDORevisionCache { private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_REVISION, AbstractCDORevisionCache.class); private static boolean disableGC; + @ExcludeFromDump + private final CleanableReferenceQueue<InternalCDORevision> referenceQueue = new CleanableReferenceQueue<InternalCDORevision>() + { + @Override + protected Reference<InternalCDORevision> createReference(InternalCDORevision revision) + { + return AbstractCDORevisionCache.this.createReference(revision); + } + + @Override + protected void cleanReference(Reference<? extends InternalCDORevision> reference) + { + AbstractCDORevisionCache.this.cleanReference(reference); + } + }; + private CDOBranchManager branchManager; private String name; public AbstractCDORevisionCache() { - setDaemon(true); + setPollMillis(ReferenceQueueWorker.DEFAULT_POLL_MILLIS); + setMaxWorkPerPoll(ReferenceQueueWorker.DEFAULT_MAX_WORK_PER_POLL); } public CDOBranchManager getBranchManager() @@ -82,49 +102,51 @@ public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<Inte this.name = name; } - @Override - public String toString() + public long getPollMillis() { - return formatName("CDORevisionCache"); + return referenceQueue.getPollMillis(); } - @Override - protected String getThreadName() + public void setPollMillis(long pollMillis) { - return formatName("CDORevisionCacheCleaner"); + referenceQueue.setPollMillis(pollMillis); } - private String formatName(String prefix) + public int getMaxWorkPerPoll() { - return prefix + (name == null ? "" : "-" + name); + return referenceQueue.getMaxWorkPerPoll(); } - @Override - protected void work(Reference<? extends InternalCDORevision> reference) + public void setMaxWorkPerPoll(int maxWorkPerPoll) { - CDORevisionKey key = (CDORevisionKey)reference; + referenceQueue.setMaxWorkPerPoll(maxWorkPerPoll); + } - CDOID id = key.getID(); - CDOBranch branch = key.getBranch(); - int version = key.getVersion(); + public final void addRevision(CDORevision revision) + { + referenceQueue.register((InternalCDORevision)revision); + doAddRevision(revision); + } - InternalCDORevision revision = (InternalCDORevision)removeRevision(id, branch.getVersion(version)); - if (revision != null) - { - // Use revision in eviction event - key = revision; - } + protected abstract void doAddRevision(CDORevision revision); - IListener[] listeners = getListeners(); - if (listeners != null) - { - fireEvent(new EvictionEventImpl(this, key), listeners); - } + public final CDORevision removeRevision(CDOID id, CDOBranchVersion branchVersion) + { + referenceQueue.clean(); + return doRemoveRevision(id, branchVersion); + } - if (TRACER.isEnabled()) - { - TRACER.format("Evicted {0} from {1}", key, this); //$NON-NLS-1$ - } + protected abstract CDORevision doRemoveRevision(CDOID id, CDOBranchVersion branchVersion); + + @Override + public String toString() + { + return formatName("CDORevisionCache"); + } + + private String formatName(String prefix) + { + return prefix + (name == null ? "" : "-" + name); } protected Reference<InternalCDORevision> createReference(CDORevision revision) @@ -139,7 +161,7 @@ public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<Inte TRACER.format("Adding revision {0} to {1}", revision, this); //$NON-NLS-1$ } - return new CacheSoftReference((InternalCDORevision)revision, getQueue()); + return new CacheSoftReference((InternalCDORevision)revision, referenceQueue); } private Reference<InternalCDORevision> createStrongReference(CDORevision revision) @@ -152,6 +174,33 @@ public abstract class AbstractCDORevisionCache extends ReferenceQueueWorker<Inte return new CacheStrongReference((InternalCDORevision)revision); } + protected void cleanReference(Reference<? extends InternalCDORevision> reference) + { + CDORevisionKey key = (CDORevisionKey)reference; + + CDOID id = key.getID(); + CDOBranch branch = key.getBranch(); + int version = key.getVersion(); + + InternalCDORevision revision = (InternalCDORevision)removeRevision(id, branch.getVersion(version)); + if (revision != null) + { + // Use revision in eviction event + key = revision; + } + + IListener[] listeners = getListeners(); + if (listeners != null) + { + fireEvent(new EvictionEventImpl(this, key), listeners); + } + + if (TRACER.isEnabled()) + { + TRACER.format("Evicted {0} from {1}", key, this); //$NON-NLS-1$ + } + } + /** * @author Eike Stepper */ diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java index c89da07d7f..391c32cd86 100644 --- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java +++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheAuditing.java @@ -168,7 +168,8 @@ public class CDORevisionCacheAuditing extends AbstractCDORevisionCache return result; } - public void addRevision(CDORevision revision) + @Override + protected void doAddRevision(CDORevision revision) { CheckUtil.checkArg(revision, "revision"); @@ -194,7 +195,8 @@ public class CDORevisionCacheAuditing extends AbstractCDORevisionCache } } - public InternalCDORevision removeRevision(CDOID id, CDOBranchVersion branchVersion) + @Override + protected InternalCDORevision doRemoveRevision(CDOID id, CDOBranchVersion branchVersion) { CDOBranch branch = branchVersion.getBranch(); checkBranch(branch); diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java index f5889d250a..f4b2a8a893 100644 --- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java +++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/internal/common/revision/CDORevisionCacheNonAuditing.java @@ -179,7 +179,8 @@ public class CDORevisionCacheNonAuditing extends AbstractCDORevisionCache return result; } - public void addRevision(CDORevision revision) + @Override + protected void doAddRevision(CDORevision revision) { CheckUtil.checkArg(revision, "revision"); checkBranch(revision.getBranch()); @@ -208,7 +209,8 @@ public class CDORevisionCacheNonAuditing extends AbstractCDORevisionCache } } - public InternalCDORevision removeRevision(CDOID id, CDOBranchVersion branchVersion) + @Override + protected InternalCDORevision doRemoveRevision(CDOID id, CDOBranchVersion branchVersion) { checkBranch(branchVersion.getBranch()); synchronized (revisions) diff --git a/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF b/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF index b372da0479..1e9592340d 100644 --- a/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.emf.cdo.examples/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.emf.cdo.examples;singleton:=true -Bundle-Version: 4.0.400.qualifier +Bundle-Version: 4.0.500.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Activator: org.eclipse.emf.cdo.internal.examples.bundle.OM$Activator @@ -20,7 +20,7 @@ Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resoluti org.eclipse.net4j.tcp;bundle-version="[4.0.0,5.0.0)", org.eclipse.net4j.db.h2;bundle-version="[4.0.0,5.0.0)" Import-Package: org.h2.jdbcx;version="[1.0.0,2.0.0)" -Export-Package: org.eclipse.emf.cdo.examples;version="4.0.400";x-internal:=true, - org.eclipse.emf.cdo.examples.server;version="4.0.400";x-internal:=true, - org.eclipse.emf.cdo.examples.server.offline;version="4.0.400";x-internal:=true, - org.eclipse.emf.cdo.internal.examples.bundle;version="4.0.400";x-internal:=true +Export-Package: org.eclipse.emf.cdo.examples;version="4.0.500";x-internal:=true, + org.eclipse.emf.cdo.examples.server;version="4.0.500";x-internal:=true, + org.eclipse.emf.cdo.examples.server.offline;version="4.0.500";x-internal:=true, + org.eclipse.emf.cdo.internal.examples.bundle;version="4.0.500";x-internal:=true diff --git a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java index 01915f9e56..1ec52e6ed8 100644 --- a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java +++ b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/StandaloneManualExample.java @@ -23,6 +23,7 @@ import org.eclipse.net4j.FactoriesProtocolProvider; import org.eclipse.net4j.Net4jUtil; import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.protocol.IProtocolProvider; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.OMPlatform; import org.eclipse.net4j.util.om.log.PrintLogHandler; @@ -31,8 +32,6 @@ import org.eclipse.net4j.util.om.trace.PrintTraceHandler; import org.eclipse.emf.ecore.EObject; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; /** * @author Eike Stepper @@ -47,16 +46,7 @@ public class StandaloneManualExample OMPlatform.INSTANCE.addTraceHandler(PrintTraceHandler.CONSOLE); // Prepare receiveExecutor - final ThreadGroup threadGroup = new ThreadGroup("net4j"); //$NON-NLS-1$ - ExecutorService receiveExecutor = Executors.newCachedThreadPool(new ThreadFactory() - { - public Thread newThread(Runnable r) - { - Thread thread = new Thread(threadGroup, r); - thread.setDaemon(true); - return thread; - } - }); + ExecutorService receiveExecutor = ThreadPool.create(); // Prepare bufferProvider IBufferProvider bufferProvider = Net4jUtil.createBufferPool(); diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java index 0a15407ab3..83e84b56dd 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitTransactionIndication.java @@ -85,14 +85,10 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori return commitContext.getPackageRegistry(); } - @Override - protected void indicatingFailed() + protected void initializeCommitContext(CDODataInput in) throws Exception { - if (commitContext != null) - { - commitContext.postCommit(false); - commitContext = null; - } + int viewID = in.readInt(); + commitContext = getTransaction(viewID).createCommitContext(); } @Override @@ -101,8 +97,8 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori try { monitor.begin(OMMonitor.TEN); - indicatingCommit(in, monitor.fork(OMMonitor.ONE)); - indicatingCommit(monitor.fork(OMMonitor.TEN - OMMonitor.ONE)); + indicatingRead(in, monitor.fork(OMMonitor.ONE)); + indicatingCommit(in, monitor.fork(OMMonitor.TEN - OMMonitor.ONE)); } catch (IOException ex) { @@ -119,7 +115,7 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori } } - protected void indicatingCommit(CDODataInput in, OMMonitor monitor) throws Exception + protected void indicatingRead(CDODataInput in, OMMonitor monitor) throws Exception { // Create commit context initializeCommitContext(in); @@ -292,32 +288,19 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori } } - private ResourceSet createResourceSet(InternalCDOPackageRegistry packageRegistry) - { - ResourceSet resourceSet = new ResourceSetImpl() - { - @Override - protected void demandLoad(Resource resource) throws IOException - { - // Do nothing: we don't want this ResourceSet to attempt demand-loads. - } - }; - - Resource.Factory resourceFactory = new EcoreResourceFactoryImpl(); - resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put("*", resourceFactory); //$NON-NLS-1$ - resourceSet.setPackageRegistry(packageRegistry); - return resourceSet; - } - - protected void initializeCommitContext(CDODataInput in) throws Exception + protected void indicatingCommit(CDODataInput in, OMMonitor monitor) { - int viewID = in.readInt(); - commitContext = getTransaction(viewID).createCommitContext(); + getRepository().commit(commitContext, monitor); } - protected void indicatingCommit(OMMonitor monitor) + @Override + protected void indicatingFailed() { - getRepository().commit(commitContext, monitor); + if (commitContext != null) + { + commitContext.postCommit(false); + commitContext = null; + } } @Override @@ -466,4 +449,21 @@ public class CommitTransactionIndication extends CDOServerIndicationWithMonitori throw new IllegalStateException("Illegal transaction: " + view); //$NON-NLS-1$ } + + private ResourceSet createResourceSet(InternalCDOPackageRegistry packageRegistry) + { + ResourceSet resourceSet = new ResourceSetImpl() + { + @Override + protected void demandLoad(Resource resource) throws IOException + { + // Do nothing: we don't want this ResourceSet to attempt demand-loads. + } + }; + + Resource.Factory resourceFactory = new EcoreResourceFactoryImpl(); + resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put("*", resourceFactory); //$NON-NLS-1$ + resourceSet.setPackageRegistry(packageRegistry); + return resourceSet; + } } diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java index f70b26ced3..409eac0fff 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CommitXATransactionPhase1Indication.java @@ -30,10 +30,17 @@ public class CommitXATransactionPhase1Indication extends CommitTransactionIndica } @Override - protected void indicatingCommit(OMMonitor monitor) + protected boolean closeInputStreamAfterMe() + { + // The commit manager processes phase1 asynchronously, so don't close the input stream on him. + return false; + } + + @Override + protected void indicatingCommit(CDODataInput in, OMMonitor monitor) { // Register transactionContext - getRepository().getCommitManager().preCommit(commitContext, monitor); + getRepository().getCommitManager().preCommit(commitContext, in, monitor); } @Override diff --git a/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF b/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF index 04d3066cb9..b4daf94f68 100644 --- a/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.emf.cdo.server.security/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.emf.cdo.server.security;singleton:=true Bundle-Name: %pluginName -Bundle-Version: 4.3.100.qualifier +Bundle-Version: 4.3.200.qualifier Bundle-ClassPath: . Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -12,15 +12,15 @@ Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)", org.eclipse.emf.cdo.security;bundle-version="[4.1.0,5.0.0)", org.eclipse.emf.cdo.net4j;bundle-version="[4.1.0,5.0.0)", org.eclipse.net4j.jvm;bundle-version="[4.1.0,5.0.0)" -Export-Package: org.eclipse.emf.cdo.server.internal.security;version="4.3.100"; +Export-Package: org.eclipse.emf.cdo.server.internal.security;version="4.3.200"; x-friends:="org.eclipse.emf.cdo.tests, org.eclipse.emf.cdo.tests.db, org.eclipse.emf.cdo.tests.db4o, org.eclipse.emf.cdo.tests.hibernate, org.eclipse.emf.cdo.tests.mongodb, org.eclipse.emf.cdo.tests.objectivity", - org.eclipse.emf.cdo.server.internal.security.bundle;version="4.3.100";x-internal:=true, - org.eclipse.emf.cdo.server.security;version="4.3.100", - org.eclipse.emf.cdo.server.spi.security;version="4.3.100" + org.eclipse.emf.cdo.server.internal.security.bundle;version="4.3.200";x-internal:=true, + org.eclipse.emf.cdo.server.security;version="4.3.200", + org.eclipse.emf.cdo.server.spi.security;version="4.3.200" Bundle-ActivationPolicy: lazy Bundle-Activator: org.eclipse.emf.cdo.server.internal.security.bundle.OM$Activator diff --git a/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java b/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java index 09738b6cfb..f2a9651c9c 100644 --- a/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java +++ b/plugins/org.eclipse.emf.cdo.server.security/src/org/eclipse/emf/cdo/server/spi/security/HomeFolderHandler.java @@ -26,6 +26,7 @@ import org.eclipse.emf.cdo.transaction.CDOTransaction; import org.eclipse.emf.cdo.view.CDOView; import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.container.IManagedContainer; import org.eclipse.net4j.util.container.IManagedContainer.ContainerAware; import org.eclipse.net4j.util.factory.ProductCreationException; @@ -43,7 +44,7 @@ import java.util.concurrent.ExecutorService; * @author Eike Stepper * @since 4.3 */ -public class HomeFolderHandler implements InternalSecurityManager.CommitHandler2 +public class HomeFolderHandler implements InternalSecurityManager.CommitHandler2, IExecutorServiceProvider { public static final String DEFAULT_HOME_FOLDER = "/home"; diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java index c7d88e390c..51511df7b9 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/CommitManager.java @@ -11,21 +11,25 @@ */ package org.eclipse.emf.cdo.internal.server; +import org.eclipse.emf.cdo.common.protocol.CDODataInput; import org.eclipse.emf.cdo.spi.server.InternalCommitContext; import org.eclipse.emf.cdo.spi.server.InternalCommitManager; import org.eclipse.emf.cdo.spi.server.InternalRepository; import org.eclipse.emf.cdo.spi.server.InternalTransaction; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.ThreadPool; +import org.eclipse.net4j.util.io.IOUtil; import org.eclipse.net4j.util.lifecycle.Lifecycle; import org.eclipse.net4j.util.om.monitor.OMMonitor; +import java.io.Closeable; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -62,8 +66,13 @@ public class CommitManager extends Lifecycle implements InternalCommitManager { if (executors == null) { - shutdownExecutorService = true; - executors = Executors.newFixedThreadPool(10); + executors = ConcurrencyUtil.getExecutorService(repository); + + if (executors == null) + { + shutdownExecutorService = true; + executors = ThreadPool.create(); + } } return executors; @@ -73,7 +82,11 @@ public class CommitManager extends Lifecycle implements InternalCommitManager { if (shutdownExecutorService) { - this.executors.shutdown(); + if (this.executors != null) + { + this.executors.shutdown(); + } + shutdownExecutorService = false; } @@ -87,12 +100,15 @@ public class CommitManager extends Lifecycle implements InternalCommitManager setExecutors(null); } - /** - * Create a future to execute commitContext in a different thread. - */ + @Deprecated public void preCommit(InternalCommitContext commitContext, OMMonitor monitor) { - TransactionCommitContextEntry contextEntry = new TransactionCommitContextEntry(monitor); + preCommit(commitContext, null, monitor); + } + + public void preCommit(InternalCommitContext commitContext, CDODataInput in, OMMonitor monitor) + { + TransactionCommitContextEntry contextEntry = new TransactionCommitContextEntry(in, monitor); contextEntry.setContext(commitContext); Future<Object> future = getExecutors().submit(contextEntry.createCallable()); @@ -145,14 +161,17 @@ public class CommitManager extends Lifecycle implements InternalCommitManager */ private static final class TransactionCommitContextEntry { + private final CDODataInput in; + + private final OMMonitor monitor; + private InternalCommitContext context; private Future<Object> future; - private OMMonitor monitor; - - public TransactionCommitContextEntry(OMMonitor monitor) + public TransactionCommitContextEntry(CDODataInput in, OMMonitor monitor) { + this.in = in; this.monitor = monitor; } @@ -163,6 +182,12 @@ public class CommitManager extends Lifecycle implements InternalCommitManager public Object call() throws Exception { context.write(monitor); + + if (in instanceof Closeable) + { + IOUtil.closeSilent((Closeable)in); + } + return null; } }; diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java index 00a34fb1f4..42fa968be2 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/QueryManager.java @@ -28,6 +28,8 @@ import org.eclipse.emf.cdo.spi.server.InternalRepository; import org.eclipse.emf.cdo.spi.server.InternalSession; import org.eclipse.emf.cdo.spi.server.InternalView; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.container.IContainerDelta.Kind; import org.eclipse.net4j.util.container.SingleDeltaContainerEvent; import org.eclipse.net4j.util.event.IEvent; @@ -38,7 +40,6 @@ import org.eclipse.net4j.util.om.trace.ContextTracer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -85,8 +86,13 @@ public class QueryManager extends Lifecycle implements InternalQueryManager { if (executors == null) { - shutdownExecutorService = true; - executors = Executors.newFixedThreadPool(10); + executors = ConcurrencyUtil.getExecutorService(repository); + + if (executors == null) + { + shutdownExecutorService = true; + executors = ThreadPool.create(); + } } return executors; @@ -96,7 +102,11 @@ public class QueryManager extends Lifecycle implements InternalQueryManager { if (shutdownExecutorService) { - this.executors.shutdown(); + if (this.executors != null) + { + this.executors.shutdown(); + } + shutdownExecutorService = false; } diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java index 338e9de661..25ede606b0 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/Repository.java @@ -104,6 +104,8 @@ import org.eclipse.net4j.util.StringUtil; import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.collection.MoveableList; import org.eclipse.net4j.util.collection.Pair; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; import org.eclipse.net4j.util.concurrent.RWOLockManager.LockState; import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException; @@ -138,13 +140,14 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; /** * @author Eike Stepper * @since 2.0 */ -public class Repository extends Container<Object>implements InternalRepository +public class Repository extends Container<Object>implements InternalRepository, IExecutorServiceProvider { private static final int UNCHUNKED = CDORevision.UNCHUNKED; @@ -1248,7 +1251,8 @@ public class Repository extends Container<Object>implements InternalRepository if (queryHandlerProvider == null) { - queryHandlerProvider = new ContainerQueryHandlerProvider(getContainer()); + IManagedContainer container = getContainer(); + queryHandlerProvider = new ContainerQueryHandlerProvider(container); } IQueryHandler handler = queryHandlerProvider.getQueryHandler(info); @@ -1275,6 +1279,12 @@ public class Repository extends Container<Object>implements InternalRepository this.container = container; } + public ExecutorService getExecutorService() + { + IManagedContainer container = getContainer(); + return ConcurrencyUtil.getExecutorService(container); + } + public Object[] getElements() { final Object[] elements = { packageRegistry, branchManager, revisionManager, sessionManager, queryManager, diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java index 37ea0935c8..d5c072c028 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/InternalCommitManager.java @@ -10,6 +10,8 @@ */ package org.eclipse.emf.cdo.spi.server; +import org.eclipse.emf.cdo.common.protocol.CDODataInput; + import org.eclipse.net4j.util.om.monitor.OMMonitor; import java.util.concurrent.ExecutionException; @@ -30,10 +32,19 @@ public interface InternalCommitManager /** * Create a future to execute commitContext in a different thread. + * + * @deprecated As of 4.5 use {@link #preCommit(InternalCommitContext, CDODataInput, OMMonitor)}. */ + @Deprecated public void preCommit(InternalCommitContext commitContext, OMMonitor monitor); /** + * Create a future to execute commitContext in a different thread. + * @since 4.5 + */ + public void preCommit(InternalCommitContext commitContext, CDODataInput in, OMMonitor monitor); + + /** * Called after a commitContext is done successfully or not. */ public void remove(InternalCommitContext commitContext); diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java index 199c922446..0746b51808 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/LockingManagerTest.java @@ -42,7 +42,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import junit.framework.AssertionFailedError; @@ -184,7 +183,7 @@ public class LockingManagerTest extends AbstractLockingTest } }; - ExecutorService executors = Executors.newFixedThreadPool(10); + ExecutorService executors = getExecutorService(); Set<Integer> keys = new HashSet<Integer>(); keys.add(1); keys.add(2); diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java index ae59a321aa..699e2d2303 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java @@ -13,9 +13,11 @@ package org.eclipse.emf.cdo.tests.config.impl; import org.eclipse.emf.cdo.tests.config.IConfig; import org.eclipse.net4j.util.ObjectUtil; +import org.eclipse.net4j.util.concurrent.ThreadPool; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper @@ -24,6 +26,8 @@ public abstract class Config implements IConfig { private static final long serialVersionUID = 1L; + protected static ExecutorService executorService = ThreadPool.create("test", 20, 100, 30); + private String name; private transient ConfigTest currentTest; @@ -109,4 +113,9 @@ public abstract class Config implements IConfig public void tearDown() throws Exception { } + + public static ExecutorService getExecutorService() + { + return executorService; + } } diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java index d56556a93c..0b8541d715 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTest.java @@ -78,6 +78,7 @@ import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutorService; import junit.framework.TestResult; @@ -100,6 +101,11 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants { } + public ExecutorService getExecutorService() + { + return Config.getExecutorService(); + } + public synchronized IScenario getScenario() { if (scenario == null) @@ -574,8 +580,8 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants @Override public String toString() { - return MessageFormat.format("{0}.{1} [{2}, {3}, {4}]", getClass().getSimpleName(), getName(), - getRepositoryConfig(), getSessionConfig(), getModelConfig()); + return MessageFormat.format("{0}.{1} [{2}, {3}, {4}]", getClass().getSimpleName(), getName(), getRepositoryConfig(), + getSessionConfig(), getModelConfig()); } @Override @@ -861,7 +867,7 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants @Target({ ElementType.TYPE, ElementType.METHOD }) public @interface Requires { - String[] value(); + String[]value(); } @Inherited @@ -869,6 +875,6 @@ public abstract class ConfigTest extends AbstractOMTest implements IConstants @Target({ ElementType.TYPE, ElementType.METHOD }) public @interface Skips { - String[] value(); + String[]value(); } } diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java index d69d5f5428..5b8964115a 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/ConfigTestSuite.java @@ -71,7 +71,7 @@ public abstract class ConfigTestSuite implements IConstants { try { - TestWrapper wrapper = new TestWrapper(testClass, scenario, this); + ScenarioSuite wrapper = new ScenarioSuite(testClass, scenario, this); if (wrapper.testCount() != 0) { suite.addTest(wrapper); @@ -152,36 +152,28 @@ public abstract class ConfigTestSuite implements IConstants /** * @author Eike Stepper */ - private static final class ConstraintsViolatedException extends Exception - { - private static final long serialVersionUID = 1L; - } - - /** - * @author Eike Stepper - */ - private static final class TestWrapper extends TestSuite + private static final class ScenarioSuite extends TestSuite { private IScenario scenario; - - public TestWrapper(Class<? extends ConfigTest> testClass, IScenario scenario, ConfigTestSuite suite) + + public ScenarioSuite(Class<? extends ConfigTest> testClass, IScenario scenario, ConfigTestSuite suite) throws ConstraintsViolatedException { // super(testClass, testClass.getName()); // Important for the UI to set the *qualified* class name! this.scenario = scenario; addTestsFromTestCase(testClass, suite); } - + @Override public void runTest(Test test, TestResult result) { if (test instanceof ConfigTest) { scenario.save(); - + ConfigTest configTest = (ConfigTest)test; configTest.setScenario(scenario); - + if (configTest.isValid()) { super.runTest(configTest, result); @@ -192,12 +184,12 @@ public abstract class ConfigTestSuite implements IConstants super.runTest(test, result); } } - + private void addTestsFromTestCase(final Class<?> theClass, ConfigTestSuite suite) throws ConstraintsViolatedException { setName(theClass.getName()); - + try { getTestConstructor(theClass); // Avoid generating multiple error messages @@ -208,15 +200,15 @@ public abstract class ConfigTestSuite implements IConstants + " has no public constructor TestCase(String name) or TestCase()")); return; } - + if (!Modifier.isPublic(theClass.getModifiers())) { addTest(warning("Class " + theClass.getName() + " is not public")); return; } - + Set<String> capabilities = scenario.getCapabilities(); - + Class<?> superClass = theClass; while (Test.class.isAssignableFrom(superClass)) { @@ -224,12 +216,12 @@ public abstract class ConfigTestSuite implements IConstants { throw new ConstraintsViolatedException(); } - + superClass = superClass.getSuperclass(); } - + List<String> names = new ArrayList<String>(); - + superClass = theClass; while (Test.class.isAssignableFrom(superClass)) { @@ -240,11 +232,11 @@ public abstract class ConfigTestSuite implements IConstants addTestMethod(method, names, theClass, suite); } } - + superClass = superClass.getSuperclass(); } } - + private boolean validateConstraints(AnnotatedElement element, Set<String> capabilities) { Requires requires = element.getAnnotation(Requires.class); @@ -258,7 +250,7 @@ public abstract class ConfigTestSuite implements IConstants } } } - + Skips skips = element.getAnnotation(Skips.class); if (skips != null) { @@ -270,10 +262,10 @@ public abstract class ConfigTestSuite implements IConstants } } } - + return true; } - + private void addTestMethod(Method m, List<String> names, Class<?> theClass, ConfigTestSuite suite) { String name = m.getName(); @@ -281,17 +273,17 @@ public abstract class ConfigTestSuite implements IConstants { return; } - + if (!isPublicTestMethod(m)) { if (isTestMethod(m)) { addTest(warning("Test method isn't public: " + m.getName() + "(" + theClass.getCanonicalName() + ")")); } - + return; } - + names.add(name); Test test = createTest(theClass, name); if (test instanceof ConfigTest) @@ -299,18 +291,26 @@ public abstract class ConfigTestSuite implements IConstants ConfigTest configTest = (ConfigTest)test; suite.prepareTest(configTest); } - + addTest(test); } - + private boolean isPublicTestMethod(Method m) { return isTestMethod(m) && Modifier.isPublic(m.getModifiers()); } - + private boolean isTestMethod(Method m) { return m.getParameterTypes().length == 0 && m.getName().startsWith("test") && m.getReturnType().equals(Void.TYPE); } } + + /** + * @author Eike Stepper + */ + private static final class ConstraintsViolatedException extends Exception + { + private static final long serialVersionUID = 1L; + } } diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java index 76a9ff1782..d3e8aeb855 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java @@ -64,6 +64,8 @@ import org.eclipse.net4j.jvm.JVMUtil; import org.eclipse.net4j.util.ObjectUtil; import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.DelegatingExecutorService; +import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory; import org.eclipse.net4j.util.container.ContainerUtil; import org.eclipse.net4j.util.container.IManagedContainer; import org.eclipse.net4j.util.event.IEvent; @@ -92,6 +94,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper @@ -242,6 +245,16 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf IManagedContainer container = ContainerUtil.createContainer(); Net4jUtil.prepareContainer(container); CDONet4jServerUtil.prepareContainer(container); + + container.registerFactory(new ExecutorServiceFactory() + { + @Override + public ExecutorService create(String threadGroupName) + { + return new DelegatingExecutorService(executorService); + } + }); + return container; } @@ -533,8 +546,8 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf if (!path.startsWith(prefix) && !hasAnnotation(CleanRepositoriesBefore.class)) { throw new RuntimeException("Test case " + test.getClass().getName() + '.' + test.getName() - + " does not use getResourcePath() for resource " + path + ", nor does it declare @" - + CleanRepositoriesBefore.class.getSimpleName()); + + " does not use getResourcePath() for resource " + path + ", nor does it declare @" + + CleanRepositoriesBefore.class.getSimpleName()); } } } diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java index 8cb638ea12..4f3ed8bbe3 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java @@ -32,6 +32,8 @@ import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.jvm.JVMUtil; import org.eclipse.net4j.tcp.TCPUtil; import org.eclipse.net4j.tcp.ssl.SSLUtil; +import org.eclipse.net4j.util.concurrent.DelegatingExecutorService; +import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory; import org.eclipse.net4j.util.container.ContainerUtil; import org.eclipse.net4j.util.container.IManagedContainer; import org.eclipse.net4j.util.event.IListener; @@ -42,13 +44,13 @@ import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.security.IPasswordCredentialsProvider; import org.eclipse.emf.ecore.EPackage; -import org.eclipse.emf.ecore.EPackage.Registry; import org.eclipse.emf.ecore.impl.EPackageImpl; import java.io.File; import java.util.Date; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper @@ -61,7 +63,7 @@ public abstract class SessionConfig extends Config implements ISessionConfig public static final String PROP_TEST_FETCH_RULE_MANAGER = "test.session.FetchRuleManager"; - private static final Registry GLOBAL_REGISTRY = EPackage.Registry.INSTANCE; + private static final EPackage.Registry GLOBAL_REGISTRY = EPackage.Registry.INSTANCE; private static final long serialVersionUID = 1L; @@ -123,6 +125,16 @@ public abstract class SessionConfig extends Config implements ISessionConfig { IManagedContainer container = ContainerUtil.createContainer(); Net4jUtil.prepareContainer(container); + + container.registerFactory(new ExecutorServiceFactory() + { + @Override + public ExecutorService create(String threadGroupName) + { + return new DelegatingExecutorService(executorService); + } + }); + return container; } diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java index 9062449980..4e992c18e2 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java @@ -96,10 +96,12 @@ import org.eclipse.emf.internal.cdo.util.DefaultLocksChangedEvent; import org.eclipse.net4j.util.AdapterUtil; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; import org.eclipse.net4j.util.WrappedException; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.concurrent.IRWLockManager; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; import org.eclipse.net4j.util.concurrent.IRWOLockManager; -import org.eclipse.net4j.util.concurrent.QueueRunner2; import org.eclipse.net4j.util.concurrent.RWOLockManager; import org.eclipse.net4j.util.event.Event; import org.eclipse.net4j.util.event.EventUtil; @@ -149,11 +151,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ -public abstract class CDOSessionImpl extends CDOTransactionContainerImpl implements InternalCDOSession +public abstract class CDOSessionImpl extends CDOTransactionContainerImpl + implements InternalCDOSession, IExecutorServiceProvider { private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SESSION, CDOSessionImpl.class); @@ -363,6 +367,11 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme return fetchRuleManager; } + public ExecutorService getExecutorService() + { + return ConcurrencyUtil.getExecutorService(sessionProtocol); + } + /** * @since 3.0 */ @@ -1308,6 +1317,9 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme protected void doAfterActivate() throws Exception { super.doAfterActivate(); + + ExecutorService executorService = ConcurrencyUtil.getExecutorService(sessionProtocol); + invalidator.setExecutor(executorService); LifecycleUtil.activate(invalidator); } @@ -1738,7 +1750,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme /** * @author Eike Stepper */ - private class Invalidator extends QueueRunner2<Invalidation> + private class Invalidator extends ExecutorWorkSerializer { private static final boolean DEBUG = false; @@ -1820,11 +1832,11 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme } @Override - protected void noWork(WorkContext context) + protected void noWork() { if (isClosed() && terminateIfSessionClosed) { - context.terminate(); + dispose(); } } @@ -1834,12 +1846,6 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme super.doAfterActivate(); terminateIfSessionClosed = true; } - - @Override - protected String getThreadName() - { - return "CDOSessionInvalidator-" + CDOSessionImpl.this; - } } /** diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java index f271273012..7170faedd9 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/DelegatingSessionProtocol.java @@ -47,6 +47,8 @@ import org.eclipse.emf.cdo.view.CDOView; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.collection.Pair; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; import org.eclipse.net4j.util.event.EventUtil; import org.eclipse.net4j.util.event.IListener; @@ -70,11 +72,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ -public class DelegatingSessionProtocol extends Lifecycle implements CDOSessionProtocol +public class DelegatingSessionProtocol extends Lifecycle implements CDOSessionProtocol, IExecutorServiceProvider { private CDOSessionProtocol delegate; @@ -122,6 +125,11 @@ public class DelegatingSessionProtocol extends Lifecycle implements CDOSessionPr return (CDOSession)delegate.getSession(); } + public ExecutorService getExecutorService() + { + return ConcurrencyUtil.getExecutorService(delegate); + } + public boolean cancelQuery(int queryId) { int attempt = 0; diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java index 8b6cd1ca12..1a28863d05 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java @@ -67,8 +67,11 @@ import org.eclipse.net4j.util.ObjectUtil; import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.collection.HashBag; import org.eclipse.net4j.util.collection.Pair; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; -import org.eclipse.net4j.util.concurrent.QueueRunner; +import org.eclipse.net4j.util.concurrent.IWorkSerializer; import org.eclipse.net4j.util.event.IEvent; import org.eclipse.net4j.util.event.IListener; import org.eclipse.net4j.util.event.Notifier; @@ -111,11 +114,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ -public class CDOViewImpl extends AbstractCDOView +public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProvider { private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_VIEW, CDOViewImpl.class); @@ -137,7 +141,7 @@ public class CDOViewImpl extends AbstractCDOView private Map<CDOObject, CDOLockState> lockStates = new WeakHashMap<CDOObject, CDOLockState>(); - private QueueRunner invalidationRunner = createInvalidationRunner(); + private ExecutorWorkSerializer invalidationRunner = createInvalidationRunner(); private volatile boolean invalidationRunnerActive; @@ -177,6 +181,11 @@ public class CDOViewImpl extends AbstractCDOView viewID = viewId; } + public ExecutorService getExecutorService() + { + return ConcurrencyUtil.getExecutorService(session); + } + /** * @since 2.0 */ @@ -932,8 +941,8 @@ public class CDOViewImpl extends AbstractCDOView { if (async) { - QueueRunner runner = getInvalidationRunner(); - runner.addWork(new InvalidationRunnable(branch, lastUpdateTime, allChangedObjects, allDetachedObjects, + IWorkSerializer serializer = getInvalidationRunner(); + serializer.addWork(new InvalidationRunnable(branch, lastUpdateTime, allChangedObjects, allDetachedObjects, oldRevisions, clearResourcePathCache)); } else @@ -997,35 +1006,23 @@ public class CDOViewImpl extends AbstractCDOView } } - public QueueRunner getInvalidationRunner() + public ExecutorWorkSerializer getInvalidationRunner() { return invalidationRunner; } - private QueueRunner createInvalidationRunner() + private ExecutorWorkSerializer createInvalidationRunner() { - return new QueueRunner() + return new ExecutorWorkSerializer() { @Override - protected String getThreadName() - { - return "CDOViewInvalidationRunner-" + CDOViewImpl.this; //$NON-NLS-1$ - } - - @Override - protected void noWork(WorkContext context) + protected void noWork() { if (isClosed()) { - context.terminate(); + dispose(); } } - - @Override - public String toString() - { - return getThreadName(); - } }; } @@ -1279,9 +1276,12 @@ public class CDOViewImpl extends AbstractCDOView { super.doAfterActivate(); + ExecutorService executorService = ConcurrencyUtil.getExecutorService(session); + invalidationRunner.setExecutor(executorService); + try { - invalidationRunner.activate(); + LifecycleUtil.activate(invalidationRunner); } catch (LifecycleException ex) { diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java index 1f9c1a12ee..f39d73c3c3 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/AllTests.java @@ -13,10 +13,10 @@ package org.eclipse.net4j.tests; import org.eclipse.net4j.tests.bugzilla.Bugzilla_241463_Test; import org.eclipse.net4j.tests.bugzilla.Bugzilla_259086_Test; import org.eclipse.net4j.tests.bugzilla.Bugzilla_262875_Test; +import org.eclipse.net4j.util.tests.ExecutorWorkSerializerTest; import org.eclipse.net4j.util.tests.ExpectedIOTest; import org.eclipse.net4j.util.tests.ExtendedIOTest; import org.eclipse.net4j.util.tests.MultiMapTest; -import org.eclipse.net4j.util.tests.QueueWorkerWorkSerializerTest; import org.eclipse.net4j.util.tests.ReferenceValueMapTest; import org.eclipse.net4j.util.tests.SecurityTest; import org.eclipse.net4j.util.tests.SortedFileMapTest; @@ -52,7 +52,7 @@ public class AllTests suite.addTestSuite(SignalMonitorTest.TCP.class); suite.addTestSuite(ExceptionTest.TCP.class); suite.addTestSuite(SecurityTest.class); - suite.addTestSuite(QueueWorkerWorkSerializerTest.class); + suite.addTestSuite(ExecutorWorkSerializerTest.class); suite.addTestSuite(ExpectedIOTest.class); // Bugzillas diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java index ccc0ff7cd4..d3c555cc28 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/TCPConnectorTest.java @@ -31,6 +31,7 @@ import org.eclipse.net4j.tcp.TCPUtil; import org.eclipse.net4j.tests.bundle.OM; import org.eclipse.net4j.util.collection.RoundRobinBlockingQueue; import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.io.IOUtil; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.security.ChallengeNegotiator; @@ -47,7 +48,6 @@ import org.eclipse.spi.net4j.InternalChannel; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * @author Eike Stepper @@ -221,9 +221,7 @@ public class TCPConnectorTest extends AbstractTransportTest public void testDeferredActivation() throws Exception { final long DELAY = 500L; - threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - + threadPool = ThreadPool.create(); bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); @@ -272,9 +270,7 @@ public class TCPConnectorTest extends AbstractTransportTest public void testNegotiationSuccess() throws Exception { - threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - + threadPool = ThreadPool.create(); bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); @@ -351,9 +347,7 @@ public class TCPConnectorTest extends AbstractTransportTest public void testInvalidUser() throws Exception { - threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - + threadPool = ThreadPool.create(); bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); @@ -412,9 +406,7 @@ public class TCPConnectorTest extends AbstractTransportTest public void testInvalidPassword() throws Exception { - threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - + threadPool = ThreadPool.create(); bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); @@ -473,9 +465,7 @@ public class TCPConnectorTest extends AbstractTransportTest public void testNoNegotiator() throws Exception { - threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - + threadPool = ThreadPool.create(); bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); @@ -513,9 +503,7 @@ public class TCPConnectorTest extends AbstractTransportTest public void testNegotiatorTooLate() throws Exception { - threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - + threadPool = ThreadPool.create(); bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java index 5fce1508d8..1abe83ff23 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMAcceptorDefImplTest.java @@ -18,12 +18,12 @@ import org.eclipse.net4j.defs.Net4jDefsFactory; import org.eclipse.net4j.internal.jvm.JVMClientConnector; import org.eclipse.net4j.jvm.IJVMAcceptor; import org.eclipse.net4j.jvm.IJVMConnector; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.defs.Net4jUtilDefsFactory; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.tests.AbstractOMTest; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * @author Andre Dietisheim @@ -88,8 +88,6 @@ public class JVMAcceptorDefImplTest extends AbstractOMTest private ExecutorService createThreadPool() { - ExecutorService threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - return threadPool; + return ThreadPool.create(); } } diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java index 0efd62a212..97d079626b 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/JVMConnectorDefImplTest.java @@ -18,12 +18,12 @@ import org.eclipse.net4j.defs.Net4jDefsFactory; import org.eclipse.net4j.internal.jvm.JVMAcceptor; import org.eclipse.net4j.jvm.IJVMAcceptor; import org.eclipse.net4j.jvm.IJVMConnector; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.defs.Net4jUtilDefsFactory; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.tests.AbstractOMTest; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * @author Andre Dietisheim @@ -63,8 +63,7 @@ public class JVMConnectorDefImplTest extends AbstractOMTest private IJVMAcceptor createJVMAcceptor() { - ExecutorService threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); + ExecutorService threadPool = ThreadPool.create(); IBufferPool bufferPool = Net4jUtil.createBufferPool(); LifecycleUtil.activate(bufferPool); diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java index 5337a886d0..2c04cdfe9c 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/tests/defs/Util.java @@ -16,6 +16,7 @@ import org.eclipse.net4j.internal.tcp.TCPAcceptor; import org.eclipse.net4j.internal.tcp.TCPClientConnector; import org.eclipse.net4j.internal.tcp.TCPConnector; import org.eclipse.net4j.internal.tcp.TCPSelector; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.security.ChallengeNegotiator; import org.eclipse.net4j.util.security.INegotiator; @@ -27,7 +28,6 @@ import org.eclipse.net4j.util.security.ResponseNegotiator; import org.eclipse.net4j.util.security.UserManager; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * @author Eike Stepper @@ -131,8 +131,6 @@ class Util public static ExecutorService createThreadPool() { - ExecutorService threadPool = Executors.newCachedThreadPool(); - LifecycleUtil.activate(threadPool); - return threadPool; + return ThreadPool.create(); } } diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java index 78f60077cf..950d8db519 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/AbstractOMTest.java @@ -58,8 +58,14 @@ import junit.framework.TestResult; */ public abstract class AbstractOMTest extends TestCase { + /** + * Timeout duration in millseconds if timeout <b>is not</b> expected. + */ public static final long DEFAULT_TIMEOUT = 15 * 1000; + /** + * Timeout duration in millseconds if timeout <b>is</b> expected. + */ public static final long DEFAULT_TIMEOUT_EXPECTED = 3 * 1000; public static boolean EXTERNAL_LOG; diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/QueueWorkerWorkSerializerTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java index 0ee4714626..4aea456412 100644 --- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/QueueWorkerWorkSerializerTest.java +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java @@ -11,7 +11,7 @@ */ package org.eclipse.net4j.util.tests; -import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer; +import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer; import org.eclipse.net4j.util.io.IOUtil; import java.util.Random; @@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** - * A test for {@link QueueWorkerWorkSerializer}. + * A test for {@link ExecutorWorkSerializer}. * * @author Andre Dietisheim */ -public class QueueWorkerWorkSerializerTest extends AbstractOMTest +public class ExecutorWorkSerializerTest extends AbstractOMTest { /** timeout to wait for execution of all work units. */ private static final int WORK_COMPLETION_TIMEOUT = 10000; @@ -47,22 +47,24 @@ public class QueueWorkerWorkSerializerTest extends AbstractOMTest private ExecutorService threadPool; /** The queue worker to submit the work units to. */ - private QueueWorkerWorkSerializer queueWorker; + private ExecutorWorkSerializer queueWorker; @Override public void setUp() { - threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS); - workConsumedLatch = new CountDownLatch(NUM_WORK); - queueWorker = new QueueWorkerWorkSerializer(); workProduced = new AtomicInteger(0); + workConsumedLatch = new CountDownLatch(NUM_WORK); + + threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS); + queueWorker = new ExecutorWorkSerializer(threadPool); + queueWorker.activate(); } @Override public void tearDown() { - threadPool.shutdown(); queueWorker.dispose(); + threadPool.shutdown(); } /** diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java new file mode 100644 index 0000000000..8ce34d151e --- /dev/null +++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.tests; + +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.concurrent.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * A test for {@link ThreadPool}. + * + * @author Eike Stepper + */ +public class ThreadPoolTest extends AbstractOMTest +{ + public void testThreadPool() throws Exception + { + final ThreadPool pool = ThreadPool.create("test", 100, 200, 60); + + try + { + final int tasks = pool.getMaximumPoolSize() + 100; + final CountDownLatch latch = new CountDownLatch(tasks); + + for (int i = 0; i < tasks; i++) + { + final int n = i; + msg("scheduling " + n); + pool.submit(new Runnable() + { + public void run() + { + msg("started " + n + " (wc=" + pool.getPoolSize() + ")"); + ConcurrencyUtil.sleep(1000); + latch.countDown(); + } + }); + } + + latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); + msg("FINISHED"); + } + finally + { + pool.shutdownNow(); + } + } +} diff --git a/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF index 5308210d6e..03926bce84 100644 --- a/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j.util.defs/META-INF/MANIFEST.MF @@ -2,14 +2,14 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: %pluginName Bundle-SymbolicName: org.eclipse.net4j.util.defs;singleton:=true -Bundle-Version: 4.1.200.qualifier +Bundle-Version: 4.1.300.qualifier Bundle-ClassPath: . Bundle-Vendor: %providerName Bundle-Localization: plugin Bundle-RequiredExecutionEnvironment: J2SE-1.5 -Export-Package: org.eclipse.net4j.util.defs;version="4.1.200", - org.eclipse.net4j.util.defs.impl;version="4.1.200", - org.eclipse.net4j.util.defs.util;version="4.1.200" +Export-Package: org.eclipse.net4j.util.defs;version="4.1.300", + org.eclipse.net4j.util.defs.impl;version="4.1.300", + org.eclipse.net4j.util.defs.util;version="4.1.300" Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";visibility:=reexport, org.eclipse.emf.ecore;bundle-version="[2.5.0,3.0.0)";visibility:=reexport, org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport diff --git a/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java b/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java index 777da0dc40..1e797958ea 100644 --- a/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java +++ b/plugins/org.eclipse.net4j.util.defs/src/org/eclipse/net4j/util/defs/impl/ThreadPoolDefImpl.java @@ -11,14 +11,13 @@ */ package org.eclipse.net4j.util.defs.impl; +import org.eclipse.net4j.util.concurrent.ThreadPool; import org.eclipse.net4j.util.defs.Net4jUtilDefsPackage; import org.eclipse.net4j.util.defs.ThreadPoolDef; import org.eclipse.emf.ecore.EClass; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; /** * <!-- begin-user-doc --> An implementation of the model object '<em><b>Thread Pool Def</b></em>'. <!-- end-user-doc @@ -30,9 +29,6 @@ import java.util.concurrent.ThreadFactory; */ public class ThreadPoolDefImpl extends ExecutorServiceDefImpl implements ThreadPoolDef { - - private static final String THREADGROUP_IDENTIFIER = "net4j"; //$NON-NLS-1$ - /** * <!-- begin-user-doc --> <!-- end-user-doc --> * @generated @@ -61,26 +57,8 @@ public class ThreadPoolDefImpl extends ExecutorServiceDefImpl implements ThreadP @Override protected Object createInstance() { - ExecutorService executorService = Executors.newCachedThreadPool(new DaemonThreadFactory(THREADGROUP_IDENTIFIER)); + ExecutorService executorService = ThreadPool.create(); return executorService; } - private static class DaemonThreadFactory implements ThreadFactory - { - private ThreadGroup threadGroup; - - public DaemonThreadFactory(String threadGroupIdentifier) - { - super(); - threadGroup = new ThreadGroup(threadGroupIdentifier); - } - - public Thread newThread(Runnable r) - { - Thread thread = new Thread(threadGroup, r); - thread.setDaemon(true); - return thread; - } - } - } // ThreadPoolDefImpl diff --git a/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF index e85dcff502..a1071bf9f4 100644 --- a/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.net4j.util;singleton:=true -Bundle-Version: 3.5.0.qualifier +Bundle-Version: 3.6.0.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -15,34 +15,34 @@ Import-Package: org.eclipse.osgi.service.debug;version="[1.0.0,2.0.0)";resolutio org.osgi.framework;version="[1.3.0,2.0.0)";resolution:=optional, org.osgi.service.log;version="[1.3.0,2.0.0)";resolution:=optional, org.osgi.util.tracker;version="[1.3.0,2.0.0)";resolution:=optional -Export-Package: org.eclipse.net4j.internal.util.bundle;version="3.5.0";x-friends:="org.eclipse.net4j.util.ui,org.eclipse.net4j.tests", - org.eclipse.net4j.internal.util.container;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.factory;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.om;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.om.pref;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.table;version="3.5.0";x-internal:=true, - org.eclipse.net4j.internal.util.test;version="3.5.0";x-friends:="org.eclipse.net4j.tests", - org.eclipse.net4j.util;version="3.5.0", - org.eclipse.net4j.util.cache;version="3.5.0", - org.eclipse.net4j.util.collection;version="3.5.0", - org.eclipse.net4j.util.concurrent;version="3.5.0", - org.eclipse.net4j.util.confirmation;version="3.5.0", - org.eclipse.net4j.util.container;version="3.5.0", - org.eclipse.net4j.util.container.delegate;version="3.5.0", - org.eclipse.net4j.util.event;version="3.5.0", - org.eclipse.net4j.util.factory;version="3.5.0", - org.eclipse.net4j.util.fsm;version="3.5.0", - org.eclipse.net4j.util.io;version="3.5.0", - org.eclipse.net4j.util.lifecycle;version="3.5.0", - org.eclipse.net4j.util.om;version="3.5.0", - org.eclipse.net4j.util.om.log;version="3.5.0", - org.eclipse.net4j.util.om.monitor;version="3.5.0", - org.eclipse.net4j.util.om.pref;version="3.5.0", - org.eclipse.net4j.util.om.trace;version="3.5.0", - org.eclipse.net4j.util.options;version="3.5.0", - org.eclipse.net4j.util.properties;version="3.5.0", - org.eclipse.net4j.util.ref;version="3.5.0", - org.eclipse.net4j.util.registry;version="3.5.0", - org.eclipse.net4j.util.security;version="3.5.0", - org.eclipse.net4j.util.transaction;version="3.5.0" +Export-Package: org.eclipse.net4j.internal.util.bundle;version="3.6.0";x-friends:="org.eclipse.net4j.util.ui,org.eclipse.net4j.tests", + org.eclipse.net4j.internal.util.container;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.factory;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.om;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.om.pref;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.table;version="3.6.0";x-internal:=true, + org.eclipse.net4j.internal.util.test;version="3.6.0";x-friends:="org.eclipse.net4j.tests", + org.eclipse.net4j.util;version="3.6.0", + org.eclipse.net4j.util.cache;version="3.6.0", + org.eclipse.net4j.util.collection;version="3.6.0", + org.eclipse.net4j.util.concurrent;version="3.6.0", + org.eclipse.net4j.util.confirmation;version="3.6.0", + org.eclipse.net4j.util.container;version="3.6.0", + org.eclipse.net4j.util.container.delegate;version="3.6.0", + org.eclipse.net4j.util.event;version="3.6.0", + org.eclipse.net4j.util.factory;version="3.6.0", + org.eclipse.net4j.util.fsm;version="3.6.0", + org.eclipse.net4j.util.io;version="3.6.0", + org.eclipse.net4j.util.lifecycle;version="3.6.0", + org.eclipse.net4j.util.om;version="3.6.0", + org.eclipse.net4j.util.om.log;version="3.6.0", + org.eclipse.net4j.util.om.monitor;version="3.6.0", + org.eclipse.net4j.util.om.pref;version="3.6.0", + org.eclipse.net4j.util.om.trace;version="3.6.0", + org.eclipse.net4j.util.options;version="3.6.0", + org.eclipse.net4j.util.properties;version="3.6.0", + org.eclipse.net4j.util.ref;version="3.6.0", + org.eclipse.net4j.util.registry;version="3.6.0", + org.eclipse.net4j.util.security;version="3.6.0", + org.eclipse.net4j.util.transaction;version="3.6.0" Eclipse-BuddyPolicy: registered diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java index 887dc5693a..80133bffe3 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java @@ -19,8 +19,10 @@ import java.util.concurrent.ExecutorService; /** * @author Eike Stepper + * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}. */ -public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable +@Deprecated +public class AsynchronousWorkSerializer implements IWorkSerializer, IExecutorServiceProvider, Runnable { private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONCURRENCY, AsynchronousWorkSerializer.class); diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java index 9b4dc04680..69534b6a06 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java @@ -18,7 +18,9 @@ import java.util.concurrent.Future; /** * @author Eike Stepper + * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}. */ +@Deprecated public class CompletionWorkSerializer implements IWorkSerializer { private CompletionService<Object> completionService; diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java index 303cf02b29..5a9905e821 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java @@ -54,4 +54,36 @@ public final class ConcurrencyUtil { return ExecutorServiceFactory.get(container); } + + /** + * @since 3.6 + */ + public static ExecutorService getExecutorService(Object object) + { + if (object instanceof IExecutorServiceProvider) + { + try + { + return ((IExecutorServiceProvider)object).getExecutorService(); + } + catch (Exception ex) + { + //$FALL-THROUGH$ + } + } + + if (object instanceof IManagedContainer) + { + try + { + return getExecutorService((IManagedContainer)object); + } + catch (Exception ex) + { + //$FALL-THROUGH$ + } + } + + return null; + } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java new file mode 100644 index 0000000000..cbd4806086 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2004-2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public class DelegatingExecutorService implements ExecutorService +{ + private final ExecutorService delegate; + + public DelegatingExecutorService(ExecutorService delegate) + { + this.delegate = delegate; + } + + public void execute(Runnable command) + { + delegate.execute(command); + } + + public void shutdown() + { + // Do nothing. + } + + public List<Runnable> shutdownNow() + { + return Collections.emptyList(); + } + + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.awaitTermination(timeout, unit); + } + + public <T> Future<T> submit(Callable<T> task) + { + return delegate.submit(task); + } + + public <T> Future<T> submit(Runnable task, T result) + { + return delegate.submit(task, result); + } + + public Future<?> submit(Runnable task) + { + return delegate.submit(task); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException + { + return delegate.invokeAll(tasks); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.invokeAll(tasks, timeout, unit); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException + { + return delegate.invokeAny(tasks); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(tasks, timeout, unit); + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java index 965deb6b6b..681d60dc0c 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java @@ -20,8 +20,6 @@ import org.eclipse.net4j.util.lifecycle.LifecycleState; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; /** * @author Eike Stepper @@ -40,87 +38,70 @@ public class ExecutorServiceFactory extends Factory super(PRODUCT_GROUP, TYPE); } - public ExecutorService create(String threadGroupName) + public ExecutorService create(String description) { - if (threadGroupName == null) - { - threadGroupName = DEFAULT_THREAD_GROUP_NAME; - } - - final ThreadGroup threadGroup = new ThreadGroup(threadGroupName); - ThreadFactory threadFactory = new ThreadFactory() - { - private int num; - - public Thread newThread(Runnable r) - { - Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-Thread-" + ++num); - thread.setDaemon(true); - return thread; - } - }; - - final ExecutorService executorService = Executors.newCachedThreadPool(threadFactory); + final ExecutorService executorService = ThreadPool.create(description); + return LifecycleUtil.delegateLifecycle(getClass().getClassLoader(), executorService, ExecutorService.class, new ILifecycle() - { - private boolean active; - - public void activate() throws LifecycleException - { - active = true; - } - - public Exception deactivate() - { - try - { - executorService.shutdown(); - active = false; - return null; - } - catch (Exception ex) { - return ex; - } - } - - public LifecycleState getLifecycleState() - { - return active ? LifecycleState.ACTIVE : LifecycleState.INACTIVE; - } - - public boolean isActive() - { - return active; - } - - public void addListener(IListener listener) - { - // Do nothing - } - - public void removeListener(IListener listener) - { - // Do nothing - } - - public IListener[] getListeners() - { - return EventUtil.NO_LISTENERS; - } - - public boolean hasListeners() - { - return false; - } - - @Override - public String toString() - { - return "CachedThreadPool"; - } - }); + private boolean active; + + public void activate() throws LifecycleException + { + active = true; + } + + public Exception deactivate() + { + try + { + executorService.shutdown(); + active = false; + return null; + } + catch (Exception ex) + { + return ex; + } + } + + public LifecycleState getLifecycleState() + { + return active ? LifecycleState.ACTIVE : LifecycleState.INACTIVE; + } + + public boolean isActive() + { + return active; + } + + public void addListener(IListener listener) + { + // Do nothing + } + + public void removeListener(IListener listener) + { + // Do nothing + } + + public IListener[] getListeners() + { + return EventUtil.NO_LISTENERS; + } + + public boolean hasListeners() + { + return false; + } + + @Override + public String toString() + { + return "CachedThreadPool"; + } + }); } public static ExecutorService get(IManagedContainer container) diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java new file mode 100644 index 0000000000..621ec5d51f --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import org.eclipse.net4j.util.lifecycle.Lifecycle; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.log.OMLogger; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public class ExecutorWorkSerializer extends Lifecycle implements IWorkSerializer +{ + private final Queue<Runnable> workQueue = new LinkedList<Runnable>(); + + private Executor executor; + + private volatile boolean working; + + private volatile boolean disposed; + + public ExecutorWorkSerializer() + { + } + + public ExecutorWorkSerializer(Executor executor) + { + this.executor = executor; + } + + public Executor getExecutor() + { + return executor; + } + + public void setExecutor(Executor executor) + { + checkInactive(); + this.executor = executor; + } + + public synchronized boolean addWork(Runnable runnable) + { + if (disposed) + { + return false; + } + + if (!working && isActive()) + { + startWork(runnable); + } + else + { + workQueue.add(runnable); + } + + return true; + } + + public synchronized void dispose() + { + LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG); + } + + @Override + public String toString() + { + return ExecutorWorkSerializer.class.getSimpleName(); + } + + protected void handleException(Runnable runnable, Throwable ex) + { + } + + protected void noWork() + { + } + + private void startWork(final Runnable runnable) + { + working = true; + if (!disposed) + { + executor.execute(new RunnableWithName() + { + public String getName() + { + if (runnable instanceof RunnableWithName) + { + return ((RunnableWithName)runnable).getName(); + } + + return null; + } + + public void run() + { + try + { + runnable.run(); + } + catch (Throwable ex) + { + try + { + handleException(runnable, ex); + } + catch (Throwable ignore) + { + //$FALL-THROUGH$ + } + } + + workDone(); + } + }); + } + } + + private synchronized void workDone() + { + Runnable runnable = workQueue.poll(); + if (runnable != null) + { + startWork(runnable); + } + else + { + noWork(); + working = false; + } + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(executor, "executor"); + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + workDone(); + } + + @Override + protected void doDeactivate() throws Exception + { + disposed = true; + working = false; + workQueue.clear(); + + super.doDeactivate(); + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java new file mode 100644 index 0000000000..becd91b6c0 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import java.util.concurrent.ExecutorService; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public interface IExecutorServiceProvider +{ + public ExecutorService getExecutorService(); +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java index 2baf087a8b..03163efaf2 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java @@ -15,7 +15,9 @@ import org.eclipse.net4j.util.om.log.OMLogger; /** * @author Eike Stepper + * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}. */ +@Deprecated public class QueueWorkerWorkSerializer extends QueueRunner implements IWorkSerializer { public QueueWorkerWorkSerializer() diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java new file mode 100644 index 0000000000..048de5aa4e --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public interface RunnableWithName extends Runnable +{ + public String getName(); +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java new file mode 100644 index 0000000000..b172dc0598 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java @@ -0,0 +1,384 @@ +/* + * Copyright (c) 2004-2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import org.eclipse.net4j.util.StringUtil; + +import java.lang.reflect.Method; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionHandler +{ + public static final String DEFAULT_THREAD_GROUP_NAME = ExecutorServiceFactory.DEFAULT_THREAD_GROUP_NAME; + + public static final int DEFAULT_CORE_POOL_SIZE = 10; + + public static final int DEFAULT_MAXIMUM_POOL_SIZE = 100; + + public static final long DEFAULT_KEEP_ALIVE_SECONDS = 60; + + private static final Class<?> LINKED_BLOCKING_DEQUE_CLASS; + + private static final Method ADD_FIRST_METHOD; + + private final Executor defaultExecutor = new Executor() + { + public void execute(Runnable runnable) + { + ThreadPool.super.execute(runnable); + } + }; + + private final Executor namingExecutor = new Executor() + { + public void execute(Runnable runnable) + { + if (runnable instanceof RunnableWithName) + { + String name = ((RunnableWithName)runnable).getName(); + if (name != null) + { + Thread thread = new Thread(runnable, name); + thread.setDaemon(true); + thread.start(); + return; + } + } + + ThreadPool.super.execute(runnable); + } + }; + + private volatile Executor executor = defaultExecutor; + + public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, ThreadFactory threadFactory) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, createWorkQueue(), threadFactory); + ((ThreadPool.WorkQueue)getQueue()).setThreadPool(this); + setRejectedExecutionHandler(this); + } + + public final void setNaming(boolean naming) + { + executor = naming ? namingExecutor : defaultExecutor; + } + + @Override + public void execute(final Runnable runnable) + { + executor.execute(runnable); + } + + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) + { + ((ThreadPool.WorkQueue)getQueue()).addFirst(runnable); + } + + public static ThreadPool create() + { + return create(null, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_SECONDS); + } + + public static ThreadPool create(String description) + { + String threadGroupName = null; + int corePoolSize = DEFAULT_CORE_POOL_SIZE; + int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; + long keepAliveSeconds = DEFAULT_KEEP_ALIVE_SECONDS; + + if (!StringUtil.isEmpty(description)) + { + String[] tokens = description.split(":"); + if (tokens.length > 0) + { + threadGroupName = tokens[0]; + if (tokens.length > 1) + { + try + { + corePoolSize = Integer.parseInt(tokens[1]); + } + catch (NumberFormatException ex) + { + //$FALL-THROUGH$ + } + + if (tokens.length > 2) + { + try + { + maximumPoolSize = Integer.parseInt(tokens[2]); + } + catch (NumberFormatException ex) + { + //$FALL-THROUGH$ + } + + if (tokens.length > 3) + { + try + { + keepAliveSeconds = Long.parseLong(tokens[3]); + } + catch (NumberFormatException ex) + { + //$FALL-THROUGH$ + } + } + } + } + } + } + + return create(threadGroupName, corePoolSize, maximumPoolSize, keepAliveSeconds); + } + + public static ThreadPool create(String threadGroupName, int corePoolSize, int maximumPoolSize, long keepAliveSeconds) + { + ThreadFactory threadFactory = createThreadFactory(threadGroupName); + return new ThreadPool(corePoolSize, maximumPoolSize, keepAliveSeconds, threadFactory); + } + + private static ThreadFactory createThreadFactory(String threadGroupName) + { + if (threadGroupName == null) + { + threadGroupName = DEFAULT_THREAD_GROUP_NAME; + } + + final ThreadGroup threadGroup = new ThreadGroup(threadGroupName); + + ThreadFactory threadFactory = new ThreadFactory() + { + private final AtomicInteger num = new AtomicInteger(); + + public Thread newThread(Runnable r) + { + Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-thread-" + num.incrementAndGet()); + thread.setDaemon(true); + return thread; + } + }; + + return threadFactory; + } + + private static ThreadPool.WorkQueue createWorkQueue() + { + if (LINKED_BLOCKING_DEQUE_CLASS != null) + { + try + { + return new WorkQueueJRE16(); + } + catch (Throwable ex) + { + //$FALL-THROUGH$ + } + } + + return new WorkQueueJRE15(); + } + + static + { + Class<?> c = null; + Method m = null; + + try + { + c = Class.forName("java.util.concurrent.LinkedBlockingDeque"); + m = c.getMethod("addFirst", Object.class); + } + catch (Throwable ex) + { + c = null; + m = null; + } + + LINKED_BLOCKING_DEQUE_CLASS = c; + ADD_FIRST_METHOD = m; + } + + /** + * @author Eike Stepper + */ + private interface WorkQueue extends BlockingQueue<Runnable> + { + public void setThreadPool(ThreadPool threadPool); + + public void addFirst(Runnable runnable); + } + + /** + * @author Eike Stepper + */ + private static final class WorkQueueJRE15 extends LinkedBlockingQueue<Runnable>implements ThreadPool.WorkQueue + { + private static final long serialVersionUID = 1L; + + private ThreadPool threadPool; + + public WorkQueueJRE15() + { + } + + public void setThreadPool(ThreadPool threadPool) + { + this.threadPool = threadPool; + } + + public void addFirst(Runnable runnable) + { + super.offer(runnable); + } + + @Override + public boolean offer(Runnable runnable) + { + if (threadPool.getPoolSize() < threadPool.getMaximumPoolSize()) + { + return false; + } + + return super.offer(runnable); + } + } + + /** + * @author Eike Stepper + */ + private static final class WorkQueueJRE16 extends AbstractQueue<Runnable>implements ThreadPool.WorkQueue + { + private final BlockingQueue<Runnable> delegate = createDelegate(); + + private ThreadPool threadPool; + + public WorkQueueJRE16() + { + } + + public void setThreadPool(ThreadPool threadPool) + { + this.threadPool = threadPool; + } + + public void addFirst(Runnable runnable) + { + try + { + ADD_FIRST_METHOD.invoke(delegate, runnable); + } + catch (Throwable ex) + { + //$FALL-THROUGH$ + } + + delegate.offer(runnable); + } + + public boolean offer(Runnable r) + { + if (threadPool.getPoolSize() < threadPool.getMaximumPoolSize()) + { + return false; + } + + return delegate.offer(r); + } + + @Override + public int size() + { + return delegate.size(); + } + + public Runnable poll() + { + return delegate.poll(); + } + + @Override + public Iterator<Runnable> iterator() + { + return delegate.iterator(); + } + + public Runnable peek() + { + return delegate.peek(); + } + + public void put(Runnable e) throws InterruptedException + { + delegate.put(e); + } + + public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.offer(e, timeout, unit); + } + + public Runnable take() throws InterruptedException + { + return delegate.take(); + } + + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.poll(timeout, unit); + } + + public int remainingCapacity() + { + return delegate.remainingCapacity(); + } + + public int drainTo(Collection<? super Runnable> c) + { + return delegate.drainTo(c); + } + + public int drainTo(Collection<? super Runnable> c, int maxElements) + { + return delegate.drainTo(c, maxElements); + } + + @SuppressWarnings("unchecked") + private static BlockingQueue<Runnable> createDelegate() + { + try + { + return (BlockingQueue<Runnable>)LINKED_BLOCKING_DEQUE_CLASS.newInstance(); + } + catch (Throwable ex) + { + //$FALL-THROUGH$ + } + + return new LinkedBlockingQueue<Runnable>(); + } + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java index 120b3a00d1..d51e68da91 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java @@ -10,6 +10,8 @@ */ package org.eclipse.net4j.util.event; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; + import java.util.concurrent.ExecutorService; /** @@ -20,7 +22,7 @@ import java.util.concurrent.ExecutorService; * @apiviz.exclude */ @Deprecated -public class ExecutorServiceNotifier extends Notifier +public class ExecutorServiceNotifier extends Notifier implements IExecutorServiceProvider { private ExecutorService notificationExecutorService; @@ -28,6 +30,14 @@ public class ExecutorServiceNotifier extends Notifier { } + /** + * @since 3.6 + */ + public ExecutorService getExecutorService() + { + return notificationExecutorService; + } + @Override public ExecutorService getNotificationService() { diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java index 3a93c37867..20f8d6a459 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java @@ -12,13 +12,14 @@ package org.eclipse.net4j.util.io; import org.eclipse.net4j.util.io.ExtendedIOUtil.ClassResolver; +import java.io.Closeable; import java.io.DataInput; import java.io.IOException; /** * @author Eike Stepper */ -public class DataInputExtender implements ExtendedDataInput +public class DataInputExtender implements ExtendedDataInput, Closeable { private DataInput input; @@ -142,4 +143,15 @@ public class DataInputExtender implements ExtendedDataInput { return input.skipBytes(n); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (input instanceof Closeable) + { + ((Closeable)input).close(); + } + } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java index c93eab8b8e..d6192e9e72 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java @@ -10,13 +10,14 @@ */ package org.eclipse.net4j.util.io; +import java.io.Closeable; import java.io.DataOutput; import java.io.IOException; /** * @author Eike Stepper */ -public class DataOutputExtender implements ExtendedDataOutput +public class DataOutputExtender implements ExtendedDataOutput, Closeable { private DataOutput output; @@ -125,4 +126,15 @@ public class DataOutputExtender implements ExtendedDataOutput { ExtendedIOUtil.writeException(output, t); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (output instanceof Closeable) + { + ((Closeable)output).close(); + } + } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java index f52c4e2983..01fbf088b7 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java @@ -12,6 +12,7 @@ package org.eclipse.net4j.util.io; import org.eclipse.net4j.util.io.ExtendedIOUtil.ClassResolver; +import java.io.Closeable; import java.io.DataInput; import java.io.EOFException; import java.io.IOException; @@ -48,7 +49,7 @@ public interface ExtendedDataInput extends DataInput * @author Eike Stepper * @since 2.0 */ - public static class Delegating implements ExtendedDataInput + public static class Delegating implements ExtendedDataInput, Closeable { private ExtendedDataInput delegate; @@ -177,6 +178,17 @@ public interface ExtendedDataInput extends DataInput { return delegate.skipBytes(n); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + } } /** @@ -209,5 +221,16 @@ public interface ExtendedDataInput extends DataInput return -1; } } + + @Override + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + + super.close(); + } } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java index f3d47ce989..74d86e83e1 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java @@ -10,6 +10,7 @@ */ package org.eclipse.net4j.util.io; +import java.io.Closeable; import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; @@ -41,7 +42,7 @@ public interface ExtendedDataOutput extends DataOutput * @author Eike Stepper * @since 2.0 */ - public static class Delegating implements ExtendedDataOutput + public static class Delegating implements ExtendedDataOutput, Closeable { private ExtendedDataOutput delegate; @@ -155,6 +156,17 @@ public interface ExtendedDataOutput extends DataOutput { delegate.writeUTF(str); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + } } /** @@ -180,5 +192,16 @@ public interface ExtendedDataOutput extends DataOutput { delegate.write(b); } + + @Override + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + + super.close(); + } } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java new file mode 100644 index 0000000000..4991d90692 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2011-2013, 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + * Simon McDuff - bug 201266 + * Simon McDuff - bug 230832 + */ +package org.eclipse.net4j.util.ref; + +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public abstract class CleanableReferenceQueue<T> extends ReferenceQueue<T> +{ + public static final int ALL_WORK_PER_POLL = ReferenceQueueWorker.ALL_WORK_PER_POLL; + + public static final int DEFAULT_MAX_WORK_PER_POLL = ReferenceQueueWorker.DEFAULT_MAX_WORK_PER_POLL; + + public static final int DEFAULT_POLL_MILLIS = ReferenceQueueWorker.DEFAULT_POLL_MILLIS; + + private final AtomicBoolean cleaning = new AtomicBoolean(); + + private int maxWorkPerPoll; + + private long pollMillis; + + private long lastPoll = System.currentTimeMillis(); + + public CleanableReferenceQueue() + { + setPollMillis(DEFAULT_POLL_MILLIS); + setMaxWorkPerPoll(DEFAULT_MAX_WORK_PER_POLL); + } + + public final long getPollMillis() + { + return pollMillis; + } + + public final void setPollMillis(long pollMillis) + { + this.pollMillis = pollMillis; + } + + public final int getMaxWorkPerPoll() + { + return maxWorkPerPoll; + } + + public final void setMaxWorkPerPoll(int maxWorkPerPoll) + { + this.maxWorkPerPoll = maxWorkPerPoll; + } + + public final void register(T object) + { + clean(); + createReference(object); + } + + public final void clean() + { + if (cleaning.compareAndSet(false, true)) + { + long now = System.currentTimeMillis(); + if (lastPoll + pollMillis > now) + { + int count = maxWorkPerPoll; + if (count == ALL_WORK_PER_POLL) + { + count = Integer.MAX_VALUE; + } + + for (int i = 0; i < count; i++) + { + Reference<? extends T> reference = poll(); + if (reference == null) + { + break; + } + + cleanReference(reference); + } + + lastPoll = now; + } + + cleaning.set(false); + } + } + + protected abstract void cleanReference(Reference<? extends T> reference); + + protected abstract Reference<T> createReference(T object); +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java index 0fc0d05f0f..2598c12aec 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java @@ -25,17 +25,17 @@ public abstract class ReferenceQueueWorker<T> extends Worker /** * @since 3.0 */ - public static final int DEFAULT_POLL_MILLIS = 1000 * 60; // One minute + public static final int ALL_WORK_PER_POLL = -1; /** * @since 3.0 */ - public static final int ALL_WORK_PER_POLL = -1; + public static final int DEFAULT_MAX_WORK_PER_POLL = ALL_WORK_PER_POLL; /** * @since 3.0 */ - public static final int DEFAULT_MAX_WORK_PER_POLL = ALL_WORK_PER_POLL; + public static final int DEFAULT_POLL_MILLIS = 1000 * 60; // One minute private ReferenceQueue<T> queue = new ReferenceQueue<T>(); diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF index 5cdff1a0c1..67e3c0df64 100644 --- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.net4j;singleton:=true -Bundle-Version: 4.4.100.qualifier +Bundle-Version: 4.5.0.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -11,7 +11,7 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5 Bundle-ClassPath: . Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional, org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport -Export-Package: org.eclipse.internal.net4j;version="4.4.100"; +Export-Package: org.eclipse.internal.net4j;version="4.5.0"; x-friends:="org.eclipse.net4j.http.server, org.eclipse.net4j.jvm, org.eclipse.net4j.tcp, @@ -20,7 +20,7 @@ Export-Package: org.eclipse.internal.net4j;version="4.4.100"; org.eclipse.net4j.http.tests, org.eclipse.net4j.tests, org.eclipse.net4j.defs", - org.eclipse.internal.net4j.buffer;version="4.4.100"; + org.eclipse.internal.net4j.buffer;version="4.5.0"; x-friends:="org.eclipse.net4j.http.server, org.eclipse.net4j.jvm, org.eclipse.net4j.tcp, @@ -29,17 +29,17 @@ Export-Package: org.eclipse.internal.net4j;version="4.4.100"; org.eclipse.net4j.http.tests, org.eclipse.net4j.tests, org.eclipse.net4j.defs", - org.eclipse.internal.net4j.bundle;version="4.4.100";x-internal:=true, - org.eclipse.net4j;version="4.4.100", - org.eclipse.net4j.acceptor;version="4.4.100", - org.eclipse.net4j.buffer;version="4.4.100", - org.eclipse.net4j.channel;version="4.4.100", - org.eclipse.net4j.connector;version="4.4.100", - org.eclipse.net4j.protocol;version="4.4.100", - org.eclipse.net4j.signal;version="4.4.100", - org.eclipse.net4j.signal.confirmation;version="4.4.100", - org.eclipse.net4j.signal.heartbeat;version="4.4.100", - org.eclipse.net4j.signal.security;version="4.4.100", - org.eclipse.net4j.signal.wrapping;version="4.4.100", - org.eclipse.spi.net4j;version="4.4.100" + org.eclipse.internal.net4j.bundle;version="4.5.0";x-internal:=true, + org.eclipse.net4j;version="4.5.0", + org.eclipse.net4j.acceptor;version="4.5.0", + org.eclipse.net4j.buffer;version="4.5.0", + org.eclipse.net4j.channel;version="4.5.0", + org.eclipse.net4j.connector;version="4.5.0", + org.eclipse.net4j.protocol;version="4.5.0", + org.eclipse.net4j.signal;version="4.5.0", + org.eclipse.net4j.signal.confirmation;version="4.5.0", + org.eclipse.net4j.signal.heartbeat;version="4.5.0", + org.eclipse.net4j.signal.security;version="4.5.0", + org.eclipse.net4j.signal.wrapping;version="4.5.0", + org.eclipse.spi.net4j;version="4.5.0" Eclipse-BuddyPolicy: registered diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java index ec1ce20bb4..9f765705e6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/TransportConfig.java @@ -14,6 +14,7 @@ import org.eclipse.net4j.ITransportConfig; import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.protocol.IProtocolProvider; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.lifecycle.ILifecycle; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.security.INegotiator; @@ -24,7 +25,7 @@ import java.util.concurrent.ExecutorService; /** * @author Eike Stepper */ -public class TransportConfig implements ITransportConfig +public class TransportConfig implements ITransportConfig, IExecutorServiceProvider { private transient ILifecycle lifecycle; @@ -70,6 +71,11 @@ public class TransportConfig implements ITransportConfig this.lifecycle = lifecycle; } + public ExecutorService getExecutorService() + { + return receiveExecutor; + } + public ExecutorService getReceiveExecutor() { return receiveExecutor; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java index ff6a8f187c..f1efa170e3 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/BufferPool.java @@ -15,6 +15,7 @@ import org.eclipse.net4j.buffer.IBufferPool; import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.ref.CleanableReferenceQueue; import org.eclipse.internal.net4j.bundle.OM; @@ -42,10 +43,27 @@ public class BufferPool extends BufferProvider implements IBufferPool.Introspect private final Queue<BufferRef> buffers = new ConcurrentLinkedQueue<BufferRef>(); @ExcludeFromDump - private final ReferenceQueue<IBuffer> referenceQueue = new ReferenceQueue<IBuffer>(); + private final CleanableReferenceQueue<IBuffer> referenceQueue = new CleanableReferenceQueue<IBuffer>() + { + @Override + protected Reference<IBuffer> createReference(IBuffer buffer) + { + return new BufferRef(buffer, this); + } - @ExcludeFromDump - private Monitor monitor; + @Override + protected void cleanReference(Reference<? extends IBuffer> reference) + { + if (buffers.remove(reference)) + { + --pooledBuffers; + if (TRACER.isEnabled()) + { + TRACER.trace("Collected buffer"); //$NON-NLS-1$ + } + } + } + }; public BufferPool(IBufferProvider provider) { @@ -169,22 +187,6 @@ public class BufferPool extends BufferProvider implements IBufferPool.Introspect ++pooledBuffers; } - @Override - protected void doActivate() throws Exception - { - super.doActivate(); - monitor = new Monitor(); - monitor.start(); - } - - @Override - protected void doDeactivate() throws Exception - { - monitor.interrupt(); - monitor = null; - super.doDeactivate(); - } - private static final class BufferRef extends SoftReference<IBuffer> { public BufferRef(IBuffer buffer, ReferenceQueue<IBuffer> queue) @@ -192,60 +194,4 @@ public class BufferPool extends BufferProvider implements IBufferPool.Introspect super(buffer, queue); } } - - private final class Monitor extends Thread - { - public Monitor() - { - setName("Net4jBufferPoolMonitor"); //$NON-NLS-1$ - setDaemon(true); - } - - @Override - public void run() - { - if (TRACER.isEnabled()) - { - TRACER.trace("Start monitoring"); //$NON-NLS-1$ - } - - try - { - while (isActive() && !isInterrupted()) - { - Reference<? extends IBuffer> bufferRef = pollQueue(); - if (bufferRef != null) - { - if (buffers.remove(bufferRef)) - { - --pooledBuffers; - if (TRACER.isEnabled()) - { - TRACER.trace("Collected buffer"); //$NON-NLS-1$ - } - } - } - } - } - catch (InterruptedException ex) - { - return; - } - finally - { - if (TRACER.isEnabled()) - { - TRACER.trace("Stop monitoring"); //$NON-NLS-1$ - } - } - } - - /** - * Factored out for better profiling. - */ - private Reference<? extends IBuffer> pollQueue() throws InterruptedException - { - return referenceQueue.remove(5000); - } - } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java index e3676f8794..fd1a9a33e6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java @@ -165,6 +165,11 @@ public class BufferInputStream extends InputStream implements IBufferHandler buffers = null; currentBuffer = null; super.close(); + + if (ccam) + { + closeChannel(); + } } @Override @@ -219,6 +224,16 @@ public class BufferInputStream extends InputStream implements IBufferHandler return true; } + /** + * Subclasses may override. + * + * @since 4.5 + */ + protected void closeChannel() + { + // Do nothing. + } + private long computeTimeout(final long check) throws IOTimeoutException { long remaining; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java index 52c99209bc..6fa3eb3cac 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java @@ -16,6 +16,7 @@ import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.io.ExtendedDataInputStream; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.net4j.util.io.IOTimeoutException; +import org.eclipse.net4j.util.io.IOUtil; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -194,6 +195,22 @@ public abstract class Signal implements Runnable } /** + * @since 4.5 + */ + protected boolean closeInputStreamAfterMe() + { + return true; + } + + /** + * @since 4.5 + */ + protected boolean closeOutputStreamAfterMe() + { + return true; + } + + /** * @since 2.0 */ protected InputStream getCurrentInputStream() @@ -265,6 +282,16 @@ public abstract class Signal implements Runnable } finally { + if (closeInputStreamAfterMe()) + { + IOUtil.closeSilent(bufferInputStream); + } + + if (closeOutputStreamAfterMe()) + { + IOUtil.closeSilent(bufferOutputStream); + } + protocol.stopSignal(this, exception); } } @@ -274,11 +301,19 @@ public abstract class Signal implements Runnable this.correlationID = correlationID; } + /** + * Transfers ownership of the passed stream to this signal. + * The signal closes the stream when done reading. + */ void setBufferInputStream(BufferInputStream inputStream) { bufferInputStream = inputStream; } + /** + * Transfers ownership of the passed stream to this signal. + * The signal closes the stream when done writing. + */ void setBufferOutputStream(BufferOutputStream outputStream) { bufferOutputStream = outputStream; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java index ae28ab0f50..fdfa654432 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java @@ -18,7 +18,6 @@ import org.eclipse.net4j.channel.ChannelOutputStream; import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.util.WrappedException; -import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; import org.eclipse.net4j.util.event.Event; import org.eclipse.net4j.util.event.IEvent; import org.eclipse.net4j.util.event.IListener; @@ -31,7 +30,6 @@ import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.bundle.OM; -import org.eclipse.spi.net4j.InternalChannel; import org.eclipse.spi.net4j.Protocol; import java.io.IOException; @@ -41,7 +39,6 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; /** * The default implementation of a {@link ISignalProtocol signal protocol}. @@ -635,24 +632,10 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> } @Override - public int read() throws IOException + protected void closeChannel() { - if (isCCAM()) - { - final InternalChannel channel = (InternalChannel)getChannel(); - - ExecutorService executorService = channel.getReceiveExecutor(); - executorService.submit(new Runnable() - { - public void run() - { - ConcurrencyUtil.sleep(500); - channel.close(); - } - }); - } - - return super.read(); + IChannel channel = getChannel(); + LifecycleUtil.deactivate(channel); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java index 451ee5d7d3..10dc6c0680 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java @@ -16,8 +16,10 @@ import org.eclipse.net4j.buffer.IBuffer; import org.eclipse.net4j.buffer.IBufferHandler; import org.eclipse.net4j.channel.IChannelMultiplexer; import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.concurrent.IWorkSerializer; -import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer; +import org.eclipse.net4j.util.concurrent.RunnableWithName; import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer; import org.eclipse.net4j.util.event.Event; import org.eclipse.net4j.util.event.IListener; @@ -33,6 +35,7 @@ import org.eclipse.spi.net4j.InternalChannel.SendQueueEvent.Type; import java.text.MessageFormat; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -42,7 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author Eike Stepper * @since 2.0 */ -public class Channel extends Lifecycle implements InternalChannel +public class Channel extends Lifecycle implements InternalChannel, IExecutorServiceProvider { private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class); @@ -117,6 +120,14 @@ public class Channel extends Lifecycle implements InternalChannel this.id = id; } + /** + * @since 4.5 + */ + public ExecutorService getExecutorService() + { + return receiveExecutor; + } + public ExecutorService getReceiveExecutor() { return receiveExecutor; @@ -219,7 +230,9 @@ public class Channel extends Lifecycle implements InternalChannel } ++receivedBuffers; - receiveSerializer.addWork(createReceiverWork(buffer)); + + ReceiverWork receiverWork = createReceiverWork(buffer); + receiveSerializer.addWork(receiverWork); } else { @@ -273,13 +286,14 @@ public class Channel extends Lifecycle implements InternalChannel { super.doActivate(); sendQueue = new SendQueue(); - if (receiveExecutor == null) + if (receiveExecutor != null) { - receiveSerializer = new SynchronousWorkSerializer(); + receiveSerializer = new ReceiveSerializer2(receiveExecutor); + LifecycleUtil.activate(receiveSerializer); } else { - receiveSerializer = new ReceiveSerializer(); + receiveSerializer = new SynchronousWorkSerializer(); } } @@ -322,8 +336,10 @@ public class Channel extends Lifecycle implements InternalChannel * * @author Eike Stepper * @since 4.1 + * @deprecated As of 4.4 use {@link ExecutorWorkSerializer}. */ - protected class ReceiveSerializer extends QueueWorkerWorkSerializer + @Deprecated + protected class ReceiveSerializer extends org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer { @Override protected String getThreadName() @@ -345,8 +361,31 @@ public class Channel extends Lifecycle implements InternalChannel * If the meaning of this type isn't clear, there really should be more of a description here... * * @author Eike Stepper + * @since 4.4 + */ + private class ReceiveSerializer2 extends ExecutorWorkSerializer + { + public ReceiveSerializer2(Executor executor) + { + super(executor); + } + + @Override + protected void noWork() + { + if (isClosed()) + { + dispose(); + } + } + } + + /** + * If the meaning of this type isn't clear, there really should be more of a description here... + * + * @author Eike Stepper */ - protected class ReceiverWork implements Runnable + protected class ReceiverWork implements RunnableWithName { private final IBuffer buffer; @@ -358,6 +397,14 @@ public class Channel extends Lifecycle implements InternalChannel this.buffer = buffer; } + /** + * @since 4.5 + */ + public String getName() + { + return "Net4jReceiveSerializer-" + Channel.this; //$NON-NLS-1$ + } + public void run() { IBufferHandler receiveHandler = getReceiveHandler(); diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java index f3e3ece4e3..b570c88f70 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java @@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.protocol.IProtocol2; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; import org.eclipse.net4j.util.event.IListener; import org.eclipse.net4j.util.lifecycle.ILifecycle; import org.eclipse.net4j.util.lifecycle.Lifecycle; @@ -29,7 +30,8 @@ import java.util.concurrent.ExecutorService; * @author Eike Stepper * @since 2.0 */ -public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IProtocol2<INFRA_STRUCTURE> +public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle + implements IProtocol2<INFRA_STRUCTURE>, IExecutorServiceProvider { private String type; |