Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2019-02-01 11:04:51 -0500
committerEike Stepper2019-02-01 11:04:51 -0500
commitddbf0ebab3920ef12645400a1b929d8accd97831 (patch)
tree5e384f10a49d84d3a0e50c927d444481c87616ee
parent3ff7e9362774a011ceb8e5937ab4f20e87fe27b0 (diff)
downloadcdo-ddbf0ebab3920ef12645400a1b929d8accd97831.tar.gz
cdo-ddbf0ebab3920ef12645400a1b929d8accd97831.tar.xz
cdo-ddbf0ebab3920ef12645400a1b929d8accd97831.zip
[544045] Various concurrency improvements (IWorkSerializer, ThreadPool, RWOLockManager)
https://bugs.eclipse.org/bugs/show_bug.cgi?id=544045
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/META-INF/MANIFEST.MF8
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/pom.xml2
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/LockingManager.java23
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/ServerCDOView.java6
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/TransactionCommitContext.java6
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/ISessionProtocol.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java172
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417825_Test.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417844_Test.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java4
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java9
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java5
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java12
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/view/CDOView.java7
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java57
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java67
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java1
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java16
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java240
-rw-r--r--plugins/org.eclipse.net4j.util/.settings/.api_filters8
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java17
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IWorkSerializer.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLockManager.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWOLockManager.java111
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java4
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SerializingExecutor.java141
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Sleeper.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousExecutor.java39
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java304
-rw-r--r--plugins/org.eclipse.net4j/.settings/.api_filters9
-rw-r--r--plugins/org.eclipse.net4j/META-INF/MANIFEST.MF32
-rw-r--r--plugins/org.eclipse.net4j/pom.xml2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java12
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java4
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java24
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java38
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java66
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java11
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java8
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java3
44 files changed, 1197 insertions, 303 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.emf.cdo.server.net4j/META-INF/MANIFEST.MF
index 19aade48a2..e8d148bac3 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.emf.cdo.server.net4j;singleton:=true
-Bundle-Version: 4.1.500.qualifier
+Bundle-Version: 4.1.600.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -12,7 +12,7 @@ Bundle-ClassPath: .
Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional,
org.eclipse.emf.cdo.server;bundle-version="[4.0.0,5.0.0)";visibility:=reexport,
org.eclipse.net4j;bundle-version="[4.0.0,5.0.0)";visibility:=reexport
-Export-Package: org.eclipse.emf.cdo.server.internal.net4j.bundle;version="4.1.500";x-internal:=true,
- org.eclipse.emf.cdo.server.internal.net4j.protocol;version="4.1.500";x-friends:="org.eclipse.emf.cdo.tests",
- org.eclipse.emf.cdo.server.net4j;version="4.1.500"
+Export-Package: org.eclipse.emf.cdo.server.internal.net4j.bundle;version="4.1.600";x-internal:=true,
+ org.eclipse.emf.cdo.server.internal.net4j.protocol;version="4.1.600";x-friends:="org.eclipse.emf.cdo.tests",
+ org.eclipse.emf.cdo.server.net4j;version="4.1.600"
Automatic-Module-Name: org.eclipse.emf.cdo.server.net4j
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/pom.xml b/plugins/org.eclipse.emf.cdo.server.net4j/pom.xml
index be415fc931..01f07ca9bd 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/pom.xml
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/pom.xml
@@ -25,7 +25,7 @@
<groupId>org.eclipse.emf.cdo</groupId>
<artifactId>org.eclipse.emf.cdo.server.net4j</artifactId>
- <version>4.1.500-SNAPSHOT</version>
+ <version>4.1.600-SNAPSHOT</version>
<packaging>eclipse-plugin</packaging>
</project>
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/LockingManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/LockingManager.java
index b03202f9f0..0a82be2b4c 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/LockingManager.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/LockingManager.java
@@ -71,6 +71,8 @@ import java.util.Set;
*/
public class LockingManager extends RWOLockManager<Object, IView> implements InternalLockManager
{
+ private static final LockType[] ALL_LOCK_TYPES = LockType.values();
+
private InternalRepository repository;
private Map<String, InternalView> openDurableViews = new HashMap<String, InternalView>();
@@ -234,14 +236,13 @@ public class LockingManager extends RWOLockManager<Object, IView> implements Int
}
@Override
- public void lock(org.eclipse.net4j.util.concurrent.IRWLockManager.LockType type, IView context, Collection<? extends Object> objectsToLock, long timeout)
- throws InterruptedException
+ public void lock(LockType type, IView context, Collection<? extends Object> objectsToLock, long timeout) throws InterruptedException
{
lock2(false, type, context, objectsToLock, false, timeout);
}
@Override
- public void lock(org.eclipse.net4j.util.concurrent.IRWLockManager.LockType type, IView context, Object objectToLock, long timeout) throws InterruptedException
+ public void lock(LockType type, IView context, Object objectToLock, long timeout) throws InterruptedException
{
Collection<Object> objectsToLock = new LinkedHashSet<Object>();
objectsToLock.add(objectToLock);
@@ -249,8 +250,8 @@ public class LockingManager extends RWOLockManager<Object, IView> implements Int
}
@Override
- public List<org.eclipse.net4j.util.concurrent.RWOLockManager.LockState<Object, IView>> lock2(org.eclipse.net4j.util.concurrent.IRWLockManager.LockType type,
- IView context, Collection<? extends Object> objectsToLock, long timeout) throws InterruptedException
+ public List<LockState<Object, IView>> lock2(LockType type, IView context, Collection<? extends Object> objectsToLock, long timeout)
+ throws InterruptedException
{
return lock2(false, type, context, objectsToLock, false, timeout);
}
@@ -342,22 +343,20 @@ public class LockingManager extends RWOLockManager<Object, IView> implements Int
}
@Override
- public synchronized List<org.eclipse.net4j.util.concurrent.RWOLockManager.LockState<Object, IView>> unlock2(IView context)
+ public synchronized List<RWOLockManager.LockState<Object, IView>> unlock2(IView context)
{
return unlock2(false, context);
}
@Override
- public synchronized List<org.eclipse.net4j.util.concurrent.RWOLockManager.LockState<Object, IView>> unlock2(IView context,
- Collection<? extends Object> objectsToUnlock)
+ public synchronized List<RWOLockManager.LockState<Object, IView>> unlock2(IView context, Collection<? extends Object> objectsToUnlock)
{
// If no locktype is specified, use the LockType.WRITE
return unlock2(false, LockType.WRITE, context, objectsToUnlock, false);
}
@Override
- public synchronized List<org.eclipse.net4j.util.concurrent.RWOLockManager.LockState<Object, IView>> unlock2(
- org.eclipse.net4j.util.concurrent.IRWLockManager.LockType type, IView context, Collection<? extends Object> objectsToUnlock)
+ public synchronized List<RWOLockManager.LockState<Object, IView>> unlock2(LockType type, IView context, Collection<? extends Object> objectsToUnlock)
{
return unlock2(false, type, context, objectsToUnlock, false);
}
@@ -369,7 +368,7 @@ public class LockingManager extends RWOLockManager<Object, IView> implements Int
}
@Override
- public synchronized void unlock(org.eclipse.net4j.util.concurrent.IRWLockManager.LockType type, IView context, Collection<? extends Object> objectsToUnlock)
+ public synchronized void unlock(LockType type, IView context, Collection<? extends Object> objectsToUnlock)
{
unlock2(type, context, objectsToUnlock);
}
@@ -846,7 +845,7 @@ public class LockingManager extends RWOLockManager<Object, IView> implements Int
LockGrade grade = LockGrade.NONE;
if (lockState != null)
{
- for (LockType type : LockType.values())
+ for (LockType type : ALL_LOCK_TYPES)
{
if (lockState.hasLock(type))
{
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/ServerCDOView.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/ServerCDOView.java
index eaaedbbca8..d2a8aaa9f4 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/ServerCDOView.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/ServerCDOView.java
@@ -159,8 +159,14 @@ public class ServerCDOView extends AbstractCDOView implements org.eclipse.emf.cd
// Do nothing
}
+ @Deprecated
public boolean isInvalidationRunnerActive()
{
+ return isInvalidating();
+ }
+
+ public boolean isInvalidating()
+ {
return false;
}
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/TransactionCommitContext.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/TransactionCommitContext.java
index 7eb8016c97..2273957b04 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/TransactionCommitContext.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/TransactionCommitContext.java
@@ -111,6 +111,8 @@ public class TransactionCommitContext implements InternalCommitContext
private static final InternalCDORevision DETACHED = new StubCDORevision(null);
+ private static final LockType[] ALL_LOCK_TYPES = LockType.values();
+
private final InternalTransaction transaction;
private final CDOBranch branch;
@@ -1100,7 +1102,7 @@ public class TransactionCommitContext implements InternalCommitContext
long timeout = repository.getOptimisticLockingTimeout();
// First lock all objects (incl. possible ref targets).
- // This is a transient operation, it does not check for existance!
+ // This is a transient operation, it does not check for existence!
lockManager.lock2(LockType.WRITE, transaction, lockedObjects, timeout);
}
catch (Exception ex)
@@ -1449,7 +1451,7 @@ public class TransactionCommitContext implements InternalCommitContext
}
LockState<Object, IView> postCommitLockState = null;
- for (LockType type : LockType.values())
+ for (LockType type : ALL_LOCK_TYPES)
{
if (lockState.isLocked(type, owner, false))
{
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/ISessionProtocol.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/ISessionProtocol.java
index 3fd57bb7fa..3fbb5c0910 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/ISessionProtocol.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/spi/server/ISessionProtocol.java
@@ -21,6 +21,7 @@ import org.eclipse.emf.cdo.common.protocol.CDOProtocol;
import org.eclipse.emf.cdo.session.remote.CDORemoteSessionMessage;
import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch;
+import org.eclipse.net4j.util.event.INotifier;
import org.eclipse.net4j.util.security.DiffieHellman.Server.Challenge;
/**
@@ -31,7 +32,7 @@ import org.eclipse.net4j.util.security.DiffieHellman.Server.Challenge;
* @noextend This interface is not intended to be extended by clients.
* @noimplement This interface is not intended to be implemented by clients.
*/
-public interface ISessionProtocol extends CDOProtocol, IAuthenticationProtocol
+public interface ISessionProtocol extends CDOProtocol, IAuthenticationProtocol, INotifier
{
/**
* @since 4.0
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java
index 4a191615d1..4dd978c44c 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java
@@ -22,6 +22,8 @@ import org.eclipse.emf.cdo.session.CDOSession;
import org.eclipse.emf.cdo.spi.common.commit.CDOCommitInfoUtil;
import org.eclipse.emf.cdo.tests.model1.Category;
import org.eclipse.emf.cdo.tests.model1.Company;
+import org.eclipse.emf.cdo.tests.model1.Customer;
+import org.eclipse.emf.cdo.tests.model1.Model1Factory;
import org.eclipse.emf.cdo.tests.util.TestAdapter;
import org.eclipse.emf.cdo.transaction.CDOCommitContext;
import org.eclipse.emf.cdo.transaction.CDOPushTransaction;
@@ -30,8 +32,10 @@ import org.eclipse.emf.cdo.transaction.CDOTransactionConflictEvent;
import org.eclipse.emf.cdo.transaction.CDOTransactionHandler2;
import org.eclipse.emf.cdo.util.CDOUtil;
import org.eclipse.emf.cdo.util.CommitException;
+import org.eclipse.emf.cdo.util.ConcurrentAccessException;
import org.eclipse.emf.cdo.view.CDOView;
+import org.eclipse.net4j.signal.SignalCounter;
import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
@@ -45,8 +49,10 @@ import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* See bug 213782, bug 201366
@@ -230,6 +236,156 @@ public class TransactionTest extends AbstractCDOTest
}
}
+ public void testCommitManyTransactionsMultiThread() throws Exception
+ {
+ final int RUNS = 1;
+ final int THREADS = 1000;
+ final int TIMEOUT = 10; // Minutes.
+ final boolean pessimistic = true;
+
+ CDOSession session = openSession();
+
+ CDOTransaction initialTransaction = session.openTransaction();
+ CDOResource resource = initialTransaction.createResource(getResourcePath("myResource"));
+
+ final Company initialCompany = getModel1Factory().createCompany();
+ resource.getContents().add(initialCompany);
+ initialTransaction.commit();
+
+ SignalCounter signalCounter = new SignalCounter(((CDONet4jSession)session).options().getNet4jProtocol());
+
+ for (int run = 1; run <= RUNS; run++)
+ {
+ System.out.println("RUN " + run);
+
+ AtomicInteger concurrentAccessExceptions = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(THREADS);
+ List<Thread> threadList = new ArrayList<Thread>();
+
+ for (int thread = 0; thread < THREADS; thread++)
+ {
+ final CDOTransaction transaction = session.openTransaction();
+ // transaction.options().setCommitInfoTimeout(1000000);
+
+ final Company company = transaction.getObject(initialCompany);
+ final Customer newCustomer = Model1Factory.eINSTANCE.createCustomer();
+
+ threadList.add(new Committer(transaction, concurrentAccessExceptions, latch, new Callable<Boolean>()
+ {
+ public Boolean call() throws Exception
+ {
+ if (pessimistic)
+ {
+ CDOUtil.getCDOObject(company).cdoWriteLock().lock(TIMEOUT, TimeUnit.MINUTES);
+ }
+
+ company.getCustomers().add(newCustomer);
+ return true;
+ }
+ }));
+ }
+
+ for (Thread thread : threadList)
+ {
+ thread.start();
+ }
+
+ if (!latch.await(TIMEOUT, TimeUnit.MINUTES))
+ {
+ fail("Timeout after " + TIMEOUT + " seconds");
+ }
+
+ System.out.println("ConcurrentAccessExceptions: " + concurrentAccessExceptions.get());
+ signalCounter.dump(IOUtil.OUT(), true);
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static final class Committer extends Thread
+ {
+ private static final ThreadGroup THREAD_GROUP = new ThreadGroup("COMMITTERS");
+
+ private static final int ATTEMPTS = 200;
+
+ private final CDOTransaction transaction;
+
+ private final AtomicInteger concurrentAccessExceptions;
+
+ private final CountDownLatch latch;
+
+ private final Callable<Boolean> operation;
+
+ private Callable<Boolean> callable = new Callable<Boolean>()
+ {
+ private int attempt;
+
+ public Boolean call() throws Exception
+ {
+ ++attempt;
+ operation.call();
+
+ try
+ {
+ transaction.commit();
+ }
+ catch (ConcurrentAccessException ex)
+ {
+ concurrentAccessExceptions.incrementAndGet();
+
+ if (attempt < ATTEMPTS)
+ {
+ transaction.rollback();
+ return true;
+ }
+ }
+ catch (Exception ex)
+ {
+ throw ex;
+ }
+
+ return false;
+ }
+ };
+
+ public Committer(CDOTransaction transaction, AtomicInteger concurrentAccessExceptions, CountDownLatch latch, Callable<Boolean> operation)
+ {
+ super(THREAD_GROUP, "Committer-" + transaction.getViewID());
+ this.transaction = transaction;
+ this.concurrentAccessExceptions = concurrentAccessExceptions;
+ this.latch = latch;
+ this.operation = operation;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (transaction.syncExec(callable))
+ {
+ // Do nothing.
+ }
+ }
+ catch (Exception ex)
+ {
+ System.out.println(ex.getClass().getName() + " --> " + ex.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ transaction.close();
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+ }
+ }
+
public void testPushModeNewObjects() throws Exception
{
IOUtil.OUT().println("Creating category1");
@@ -641,4 +797,20 @@ public class TransactionTest extends AbstractCDOTest
company.setName("ABC");
transaction.commit();
}
+ //
+ // public static <V> V syncCommit(CDOTransaction transaction, int commitAttempts, EObject object, Transactional<V>
+ // transactional)
+ // throws ConcurrentAccessException, CommitException
+ // {
+ // int xxx;
+ // return null;
+ // }
+ //
+ // /**
+ // * @author Eike Stepper
+ // */
+ // public interface Transactional<S>
+ // {
+ //
+ // }
}
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417825_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417825_Test.java
index cefd1a5581..6032bbb131 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417825_Test.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417825_Test.java
@@ -16,6 +16,7 @@ import org.eclipse.emf.internal.cdo.session.CDOSessionImpl;
import org.eclipse.emf.internal.cdo.session.SessionUtil;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
/**
* Bug 417825 - Invalidator can die if CDOSession can not be activated within 100ms.
@@ -39,7 +40,7 @@ public class Bugzilla_417825_Test extends AbstractCDOTest
{
CDOSessionImpl session2 = (CDOSessionImpl)openSession();
assertEquals(true, session2.isActive());
- assertEquals(true, session2.getInvalidator().isActive());
+ assertEquals(true, LifecycleUtil.isActive(session2.getInvalidator()));
}
finally
{
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417844_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417844_Test.java
index 14b34b6291..c3f375ca51 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417844_Test.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_417844_Test.java
@@ -19,6 +19,7 @@ import org.eclipse.emf.internal.cdo.view.CDOViewImpl;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
/**
* Bug 417844 - InvalidationRunner can die if invalidations come too early.
@@ -53,7 +54,7 @@ public class Bugzilla_417844_Test extends AbstractCDOTest
{
CDOViewImpl view = (CDOViewImpl)session.openView();
assertEquals(true, view.isActive());
- assertEquals(true, view.getInvalidationRunner().isActive());
+ assertEquals(true, LifecycleUtil.isActive(view.getInvalidator()));
}
finally
{
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java
index 0c66fd3ddd..aff2dad2a4 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/Config.java
@@ -26,7 +26,9 @@ public abstract class Config implements IConfig
{
private static final long serialVersionUID = 1L;
- protected static ExecutorService executorService = ThreadPool.create("test", 10, 1000, 10);
+ protected static final int MAX_THREADS_PER_POOL = 10000;
+
+ protected static ExecutorService executorService = ThreadPool.create("test", 10, MAX_THREADS_PER_POOL, 10);
private String name;
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
index 4d4c3460f8..3d76e460d6 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
@@ -74,6 +74,7 @@ import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.DelegatingExecutorService;
import org.eclipse.net4j.util.concurrent.ExecutorServiceFactory;
+import org.eclipse.net4j.util.concurrent.ThreadPool;
import org.eclipse.net4j.util.container.ContainerUtil;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.container.IPluginContainer;
@@ -134,12 +135,16 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf
private static final boolean LOG_MULTI_VIEW_COMMIT = false;
- private static final Boolean enableServerBrowser = Boolean.getBoolean("org.eclipse.emf.cdo.tests.config.impl.RepositoryConfig.enableServerBrowser");
+ private static final boolean enableServerBrowser = Boolean.getBoolean("org.eclipse.emf.cdo.tests.config.impl.RepositoryConfig.enableServerBrowser");
+
+ private static final boolean useGlobalThreadPool = Boolean.getBoolean("org.eclipse.emf.cdo.tests.config.impl.RepositoryConfig.useGlobalThreadPool");
private static final long serialVersionUID = 1L;
protected static IManagedContainer serverContainer;
+ protected static ExecutorService serverThreadPool = useGlobalThreadPool ? executorService : ThreadPool.create("server", 10, MAX_THREADS_PER_POOL, 10);
+
protected static Map<String, InternalRepository> repositories;
private static String lastRepoProps;
@@ -348,7 +353,7 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf
@Override
public ExecutorService create(String threadGroupName)
{
- return new DelegatingExecutorService(executorService);
+ return new DelegatingExecutorService(serverThreadPool);
}
});
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java
index 6bf0e27140..b7cd258b4e 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/SessionConfig.java
@@ -624,6 +624,11 @@ public abstract class SessionConfig extends Config implements ISessionConfig
public void setUp() throws Exception
{
super.setUp();
+ if (!usesServerContainer())
+ {
+ JVMUtil.prepareContainer(getServerContainer());
+ }
+
JVMUtil.prepareContainer(getClientContainer());
}
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java
index c13a8017e8..dd75e242b0 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java
@@ -470,10 +470,20 @@ public class CDOPushTransaction extends Notifier implements CDOTransaction
/**
* @since 4.0
+ * @deprecated
*/
+ @Deprecated
public boolean isInvalidationRunnerActive()
{
- return delegate.isInvalidationRunnerActive();
+ return isInvalidating();
+ }
+
+ /**
+ * @since 4.7
+ */
+ public boolean isInvalidating()
+ {
+ return delegate.isInvalidating();
}
public Map<CDOID, CDOObject> getNewObjects()
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/view/CDOView.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/view/CDOView.java
index 5bc6660c57..09fb92543f 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/view/CDOView.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/view/CDOView.java
@@ -222,8 +222,15 @@ public interface CDOView extends CDOCommonView, CDOUpdatable, CDOCommitHistory.P
public boolean setTimeStamp(long timeStamp, IProgressMonitor monitor);
/**
+ * @since 4.7
+ */
+ public boolean isInvalidating();
+
+ /**
* @since 4.0
+ * @deprecated As of 4.7 use {@link #isInvalidating()}.
*/
+ @Deprecated
public boolean isInvalidationRunnerActive();
/**
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java
index a55cec5990..c7f45506b6 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java
@@ -97,13 +97,13 @@ import org.eclipse.net4j.util.AdapterUtil;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
-import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IRWLockManager;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.concurrent.IRWOLockManager;
import org.eclipse.net4j.util.concurrent.RWOLockManager;
import org.eclipse.net4j.util.concurrent.RunnableWithName;
+import org.eclipse.net4j.util.concurrent.SerializingExecutor;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.EventUtil;
import org.eclipse.net4j.util.event.IEvent;
@@ -154,6 +154,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
@@ -207,7 +208,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
}
};
- private Invalidator invalidator = new Invalidator();
+ private final SessionInvalidator invalidator = new SessionInvalidator();
private CDORepositoryInfo repositoryInfo;
@@ -1112,7 +1113,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
invalidator.reorderInvalidations(invalidationData);
}
- public ILifecycle getInvalidator()
+ public Executor getInvalidator()
{
return invalidator;
}
@@ -1371,9 +1372,6 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
protected void doAfterActivate() throws Exception
{
super.doAfterActivate();
-
- ExecutorService executorService = ConcurrencyUtil.getExecutorService(sessionProtocol);
- invalidator.setExecutor(executorService);
LifecycleUtil.activate(invalidator);
}
@@ -1845,19 +1843,17 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
/**
* @author Eike Stepper
*/
- private class Invalidator extends ExecutorWorkSerializer
+ private final class SessionInvalidator extends SerializingExecutor
{
private static final boolean DEBUG = false;
private final Set<Object> unfinishedLocalCommits = new HashSet<Object>();
- private final List<Invalidation> reorderQueue = new ArrayList<Invalidation>();
-
- private boolean terminateIfSessionClosed;
+ private final List<SessionInvalidation> reorderQueue = new ArrayList<SessionInvalidation>();
private int lastCommitNumber;
- public Invalidator()
+ public SessionInvalidator()
{
}
@@ -1891,7 +1887,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
public synchronized void reorderInvalidations(InvalidationData invalidationData)
{
- Invalidation invalidation = new Invalidation(invalidationData);
+ SessionInvalidation invalidation = new SessionInvalidation(invalidationData);
reorderQueue.add(invalidation);
Collections.sort(reorderQueue);
@@ -1909,12 +1905,12 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
{
while (!reorderQueue.isEmpty() && canProcess(reorderQueue.get(0)))
{
- Invalidation invalidation0 = reorderQueue.remove(0);
- addWork(invalidation0);
+ SessionInvalidation invalidation0 = reorderQueue.remove(0);
+ execute(invalidation0);
}
}
- protected boolean canProcess(Invalidation invalidation)
+ protected boolean canProcess(SessionInvalidation invalidation)
{
if (options().isPassiveUpdateEnabled())
{
@@ -1925,34 +1921,18 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
return unfinishedLocalCommits.size() == 1; // Ourselves
}
-
- @Override
- protected void noWork()
- {
- if (isClosed() && terminateIfSessionClosed)
- {
- dispose();
- }
- }
-
- @Override
- protected void doAfterActivate() throws Exception
- {
- super.doAfterActivate();
- terminateIfSessionClosed = true;
- }
}
/**
* @author Eike Stepper
*/
- private final class Invalidation extends RunnableWithName implements Comparable<Invalidation>
+ private final class SessionInvalidation extends RunnableWithName implements Comparable<SessionInvalidation>
{
private final InvalidationData invalidationData;
private final CDOCommitInfo commitInfo;
- public Invalidation(InvalidationData invalidationData)
+ public SessionInvalidation(InvalidationData invalidationData)
{
this.invalidationData = invalidationData;
commitInfo = invalidationData.getCommitInfo();
@@ -1968,7 +1948,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
return commitInfo.getPreviousTimeStamp();
}
- public int compareTo(Invalidation o)
+ public int compareTo(SessionInvalidation o)
{
return CDOCommonUtil.compareTimeStamps(getTimeStamp(), o.getTimeStamp());
}
@@ -1990,7 +1970,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
{
long timeStamp = getTimeStamp();
- if (Invalidator.DEBUG)
+ if (SessionInvalidator.DEBUG)
{
IOUtil.OUT().println(CDOSessionImpl.this + " [" + getLastUpdateTime() % 10000 + "] " + timeStamp % 10000 + " INVALIDATE");
}
@@ -2025,7 +2005,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
if (success)
{
- fireEvent(new InvalidationEvent(sender, commitInfo, invalidationData.getSecurityImpact(), oldPermissions));
+ fireEvent(new SessionInvalidationEvent(sender, commitInfo, invalidationData.getSecurityImpact(), oldPermissions));
fireEvent(new SessionLocksChangedEvent(sender, lockChangeInfo));
commitInfoManager.notifyCommitInfoHandlers(commitInfo);
@@ -2243,7 +2223,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
/**
* @author Eike Stepper
*/
- private final class InvalidationEvent extends Event implements InternalCDOSessionInvalidationEvent
+ private final class SessionInvalidationEvent extends Event implements InternalCDOSessionInvalidationEvent
{
private static final long serialVersionUID = 1L;
@@ -2255,7 +2235,8 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme
private Map<CDORevision, CDOPermission> oldPermissions;
- public InvalidationEvent(InternalCDOTransaction sender, CDOCommitInfo commitInfo, byte securityImpact, Map<CDORevision, CDOPermission> oldPermissions)
+ public SessionInvalidationEvent(InternalCDOTransaction sender, CDOCommitInfo commitInfo, byte securityImpact,
+ Map<CDORevision, CDOPermission> oldPermissions)
{
super(CDOSessionImpl.this);
this.sender = sender;
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java
index 00d8a84fc1..40cf5f71ae 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/view/CDOViewImpl.java
@@ -75,11 +75,10 @@ import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.collection.HashBag;
import org.eclipse.net4j.util.collection.Pair;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
-import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
-import org.eclipse.net4j.util.concurrent.IWorkSerializer;
import org.eclipse.net4j.util.concurrent.RunnableWithName;
+import org.eclipse.net4j.util.concurrent.SerializingExecutor;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
@@ -160,9 +159,9 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
private Map<CDOObject, CDOLockState> lockStates = new WeakHashMap<CDOObject, CDOLockState>();
- private ExecutorWorkSerializer invalidationRunner = createInvalidationRunner();
+ private ViewInvalidator invalidator = new ViewInvalidator();
- private volatile boolean invalidationRunnerActive;
+ private volatile boolean invalidating;
/**
* @since 2.0
@@ -1164,8 +1163,8 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
{
if (invalidationData.isAsync())
{
- IWorkSerializer serializer = getInvalidationRunner();
- serializer.addWork(new InvalidationRunnable(invalidationData));
+ ViewInvalidation work = new ViewInvalidation(invalidationData);
+ invalidator.execute(work);
}
else
{
@@ -1262,29 +1261,20 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
}
}
- public ExecutorWorkSerializer getInvalidationRunner()
+ public ViewInvalidator getInvalidator()
{
- return invalidationRunner;
+ return invalidator;
}
- private ExecutorWorkSerializer createInvalidationRunner()
+ @Deprecated
+ public boolean isInvalidationRunnerActive()
{
- return new ExecutorWorkSerializer()
- {
- @Override
- protected void noWork()
- {
- if (isClosed())
- {
- dispose();
- }
- }
- };
+ return isInvalidating();
}
- public boolean isInvalidationRunnerActive()
+ public boolean isInvalidating()
{
- return invalidationRunnerActive;
+ return invalidating;
}
private void sendInvalidationNotifications(Set<CDOObject> dirtyObjects, Set<CDOObject> detachedObjects)
@@ -1321,7 +1311,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
IListener[] listeners = getListeners();
if (listeners != null)
{
- fireEvent(new InvalidationEvent(timeStamp, revisionDeltas, detachedObjects), listeners);
+ fireEvent(new ViewInvalidationEvent(timeStamp, revisionDeltas, detachedObjects), listeners);
}
}
}
@@ -1609,12 +1599,12 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
{
super.doAfterActivate();
- ExecutorService executorService = ConcurrencyUtil.getExecutorService(session);
- invalidationRunner.setExecutor(executorService);
+ ExecutorService executorService = getExecutorService();
+ invalidator.setDelegate(executorService);
try
{
- LifecycleUtil.activate(invalidationRunner);
+ LifecycleUtil.activate(invalidator);
}
catch (LifecycleException ex)
{
@@ -1629,7 +1619,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
@Override
protected void doBeforeDeactivate() throws Exception
{
- // Detach viewset from the view
+ // Detach the view set from the view.
InternalCDOViewSet viewSet = getViewSet();
viewSet.remove(this);
@@ -1645,7 +1635,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
unitManager.deactivate();
CDOViewRegistryImpl.INSTANCE.deregister(this);
- LifecycleUtil.deactivate(invalidationRunner, OMLogger.Level.WARN);
+ LifecycleUtil.deactivate(invalidator, OMLogger.Level.WARN);
try
{
@@ -2768,11 +2758,18 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
/**
* @author Eike Stepper
*/
- private final class InvalidationRunnable extends RunnableWithName
+ private final class ViewInvalidator extends SerializingExecutor
+ {
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class ViewInvalidation extends RunnableWithName
{
private final ViewInvalidationData invalidationData;
- public InvalidationRunnable(ViewInvalidationData invalidationData)
+ public ViewInvalidation(ViewInvalidationData invalidationData)
{
this.invalidationData = invalidationData;
}
@@ -2780,7 +2777,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
@Override
public String getName()
{
- return "CDOViewInvalidationRunner-" + CDOViewImpl.this; //$NON-NLS-1$
+ return "Invalidator-" + CDOViewImpl.this; //$NON-NLS-1$
}
@Override
@@ -2788,7 +2785,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
{
try
{
- invalidationRunnerActive = true;
+ invalidating = true;
doInvalidate(invalidationData);
}
catch (Exception ex)
@@ -2800,7 +2797,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
}
finally
{
- invalidationRunnerActive = false;
+ invalidating = false;
}
}
}
@@ -2808,7 +2805,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
/**
* @author Simon McDuff
*/
- private final class InvalidationEvent extends Event implements CDOViewInvalidationEvent
+ private final class ViewInvalidationEvent extends Event implements CDOViewInvalidationEvent
{
private static final long serialVersionUID = 1L;
@@ -2818,7 +2815,7 @@ public class CDOViewImpl extends AbstractCDOView implements IExecutorServiceProv
private Set<CDOObject> detachedObjects;
- public InvalidationEvent(long timeStamp, Map<CDOObject, CDORevisionDelta> revisionDeltas, Set<CDOObject> detachedObjects)
+ public ViewInvalidationEvent(long timeStamp, Map<CDOObject, CDORevisionDelta> revisionDeltas, Set<CDOObject> detachedObjects)
{
this.timeStamp = timeStamp;
this.revisionDeltas = revisionDeltas;
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
index 900371c9c2..a772e5a59c 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ControlChannel.java
@@ -66,7 +66,6 @@ public class ControlChannel extends Channel
{
setID(CONTROL_CHANNEL_INDEX);
setMultiplexer(connector);
- setReceiveExecutor(connector.getConfig().getReceiveExecutor());
setUserID(connector.getUserID());
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
index 0518006ff1..a7e3fda1e1 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
@@ -11,7 +11,7 @@
*/
package org.eclipse.net4j.util.tests;
-import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.SerializingExecutor;
import org.eclipse.net4j.util.io.IOUtil;
import java.util.Random;
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * A test for {@link ExecutorWorkSerializer}.
+ * A test for {@link SerializingExecutor}.
*
* @author Andre Dietisheim
*/
@@ -46,8 +46,8 @@ public class ExecutorWorkSerializerTest extends AbstractOMTest
/** The thread pool to execute the work unit producers in. */
private ExecutorService threadPool;
- /** The queue worker to submit the work units to. */
- private ExecutorWorkSerializer queueWorker;
+ /** The executor to submit the work units to. */
+ private SerializingExecutor serializer;
@Override
public void setUp()
@@ -56,14 +56,14 @@ public class ExecutorWorkSerializerTest extends AbstractOMTest
workConsumedLatch = new CountDownLatch(NUM_WORK);
threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS);
- queueWorker = new ExecutorWorkSerializer(threadPool);
- queueWorker.activate();
+ serializer = new SerializingExecutor(threadPool);
+ serializer.activate();
}
@Override
public void tearDown()
{
- queueWorker.dispose();
+ serializer.deactivate();
threadPool.shutdown();
}
@@ -165,7 +165,7 @@ public class ExecutorWorkSerializerTest extends AbstractOMTest
int currentWorkProduced;
while ((currentWorkProduced = workProduced.getAndIncrement()) < NUM_WORK)
{
- queueWorker.addWork(createWork(currentWorkProduced));
+ serializer.execute(createWork(currentWorkProduced));
Thread.sleep(random.nextInt(1000));
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
index 88fcdadac8..6dacd1e72d 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
@@ -12,9 +12,14 @@ package org.eclipse.net4j.util.tests;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.ThreadPool;
+import org.eclipse.net4j.util.tests.ThreadPoolTest.TaskManager.Task;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A test for {@link ThreadPool}.
@@ -23,9 +28,9 @@ import java.util.concurrent.TimeUnit;
*/
public class ThreadPoolTest extends AbstractOMTest
{
- public void testThreadPool() throws Exception
+ public void testExceedMaximumPoolSize() throws Exception
{
- final ThreadPool pool = ThreadPool.create("test", 100, 200, 60);
+ final ThreadPool pool = ThreadPool.create("test", 10, 20, 60);
try
{
@@ -36,7 +41,7 @@ public class ThreadPoolTest extends AbstractOMTest
{
final int n = i;
msg("scheduling " + n);
- pool.submit(new Runnable()
+ pool.execute(new Runnable()
{
public void run()
{
@@ -48,11 +53,238 @@ public class ThreadPoolTest extends AbstractOMTest
}
latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
- msg("FINISHED");
+ msg("FINISHED with largest pool size = " + pool.getLargestPoolSize());
}
finally
{
pool.shutdownNow();
}
}
+
+ public void testWithKeepAlive() throws Exception
+ {
+ runTest(1000);
+ }
+
+ public void testWithoutKeepAlive() throws Exception
+ {
+ runTest(0);
+ }
+
+ private void runTest(long keepAliveTime)
+ {
+ TaskManager taskManager = new TaskManager(10, 20, keepAliveTime);
+ int max = taskManager.getMaximumPoolSize();
+ int extra = 10;
+ int count = max + 10;
+
+ for (int run = 0; run < 10; run++)
+ {
+ System.out.println("RUN " + (run + 1));
+
+ Task[] tasks = taskManager.createTasks(count);
+ assertEquals(count, taskManager.getCreatedTasks());
+ assertEquals(0, taskManager.getScheduledTasks());
+ assertEquals(0, taskManager.getCurrentlyEnqueuedTasks());
+ assertEquals(0, taskManager.getFinishedTasks());
+
+ taskManager.schedule(tasks, 0, count);
+ assertEquals(count, taskManager.getScheduledTasks());
+ sleep(20);
+ assertEquals(max, taskManager.getStartedTasks());
+ assertEquals(0, taskManager.getFinishedTasks());
+ assertEquals(extra, taskManager.getCurrentlyEnqueuedTasks());
+
+ for (int i = 1; i <= extra; i++)
+ {
+ tasks[i - 1].finish();
+
+ sleep(10);
+ assertEquals(max + i, taskManager.getStartedTasks());
+ assertEquals(i, taskManager.getFinishedTasks());
+ assertEquals(extra - i, taskManager.getCurrentlyEnqueuedTasks());
+ }
+
+ assertEquals(count, taskManager.getStartedTasks());
+ assertEquals(extra, taskManager.getFinishedTasks());
+ assertEquals(0, taskManager.getCurrentlyEnqueuedTasks());
+
+ for (int i = extra; i < count; i++)
+ {
+ tasks[i].finish();
+ }
+
+ sleep(20);
+ assertEquals(count, taskManager.getStartedTasks());
+ assertEquals(count, taskManager.getFinishedTasks());
+ assertEquals(0, taskManager.getCurrentlyEnqueuedTasks());
+
+ taskManager.resetStatistics();
+ }
+ }
+
+ // public static void assertEquals(int expected, int actual)
+ // {
+ // try
+ // {
+ // Assert.assertEquals(expected, actual);
+ // }
+ // catch (RuntimeException ex)
+ // {
+ // ex.printStackTrace();
+ // }
+ // }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class TaskManager extends ThreadPool
+ {
+ private final AtomicInteger createdTasks = new AtomicInteger();
+
+ private final AtomicInteger scheduledTasks = new AtomicInteger();
+
+ private final AtomicInteger startedTasks = new AtomicInteger();
+
+ private final AtomicInteger finishedTasks = new AtomicInteger();
+
+ public TaskManager(int corePoolSize, int maximumPoolSize, long keepAliveTime)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, createThreadFactory());
+ }
+
+ public int getCreatedTasks()
+ {
+ return createdTasks.get();
+ }
+
+ public int getScheduledTasks()
+ {
+ return scheduledTasks.get();
+ }
+
+ public int getStartedTasks()
+ {
+ return startedTasks.get();
+ }
+
+ public int getFinishedTasks()
+ {
+ return finishedTasks.get();
+ }
+
+ public int getCurrentlyEnqueuedTasks()
+ {
+ return getQueue().size();
+ }
+
+ public int getInactiveWorkers()
+ {
+ return getPoolSize() - getActiveCount();
+ }
+
+ public void resetStatistics()
+ {
+ createdTasks.set(0);
+ scheduledTasks.set(0);
+ startedTasks.set(0);
+ finishedTasks.set(0);
+ }
+
+ public Task[] createTasks(int count)
+ {
+ Task[] result = new Task[count];
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = new Task(this, i + 1);
+ }
+
+ return result;
+ }
+
+ public void schedule(Task[] tasks, int start, int end)
+ {
+ for (int i = start; i < end; i++)
+ {
+ Task task = tasks[i];
+ execute(task);
+ }
+ }
+
+ @Override
+ public void execute(Runnable command)
+ {
+ scheduledTasks.incrementAndGet();
+ super.execute(command);
+ }
+
+ private static ThreadFactory createThreadFactory()
+ {
+ final ThreadFactory factory = Executors.defaultThreadFactory();
+
+ return new ThreadFactory()
+ {
+ public Thread newThread(Runnable task)
+ {
+ System.out.println("Creating new worker");
+ return factory.newThread(task);
+ }
+ };
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class Task extends CountDownLatch implements Runnable
+ {
+ private final TaskManager manager;
+
+ private final int id;
+
+ private AtomicBoolean used = new AtomicBoolean();
+
+ public Task(TaskManager manager, int id)
+ {
+ super(1);
+ this.manager = manager;
+ this.id = id;
+
+ manager.createdTasks.incrementAndGet();
+ }
+
+ public final void finish()
+ {
+ countDown();
+ }
+
+ public final void run()
+ {
+ if (!used.compareAndSet(false, true))
+ {
+ throw new IllegalStateException(this + " has already been used");
+ }
+
+ manager.startedTasks.incrementAndGet();
+ System.out.println("Running " + this);
+
+ try
+ {
+ await();
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ System.out.println("Finished " + this);
+ manager.finishedTasks.incrementAndGet();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Task " + id;
+ }
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j.util/.settings/.api_filters b/plugins/org.eclipse.net4j.util/.settings/.api_filters
index 669faac381..ba9ccf65cb 100644
--- a/plugins/org.eclipse.net4j.util/.settings/.api_filters
+++ b/plugins/org.eclipse.net4j.util/.settings/.api_filters
@@ -85,6 +85,14 @@
</message_arguments>
</filter>
</resource>
+ <resource path="src/org/eclipse/net4j/util/concurrent/RWOLockManager.java" type="org.eclipse.net4j.util.concurrent.RWOLockManager">
+ <filter id="336658481">
+ <message_arguments>
+ <message_argument value="org.eclipse.net4j.util.concurrent.RWOLockManager"/>
+ <message_argument value="ALL_LOCK_TYPES"/>
+ </message_arguments>
+ </filter>
+ </resource>
<resource path="src/org/eclipse/net4j/util/container/ContainerEvent.java" type="org.eclipse.net4j.util.container.ContainerEvent">
<filter id="576725006">
<message_arguments>
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java
index 6d6c94957f..af4d25064d 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java
@@ -11,6 +11,7 @@
package org.eclipse.net4j.util.concurrent;
import org.eclipse.net4j.util.container.IManagedContainer;
+import org.eclipse.net4j.util.io.IOUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -123,4 +124,20 @@ public final class ConcurrencyUtil
{
thread.setName(name);
}
+
+ /**
+ * @since 3.9
+ */
+ public static void setThreadName(String name)
+ {
+ setThreadName(Thread.currentThread(), name);
+ }
+
+ /**
+ * @since 3.9
+ */
+ public static void log(String msg)
+ {
+ IOUtil.OUT().println(Thread.currentThread().getName() + ": " + msg);
+ }
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java
index df62dfec09..bf49220719 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java
@@ -21,7 +21,9 @@ import java.util.concurrent.Executor;
/**
* @author Eike Stepper
* @since 3.6
+ * @deprecated As of 3.9 use an executor such as {@link SerializingExecutor}.
*/
+@Deprecated
public class ExecutorWorkSerializer extends Lifecycle implements IWorkSerializer
{
private final Queue<Runnable> workQueue = new LinkedList<Runnable>();
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IWorkSerializer.java
index d70c5dfe3a..4b1e80dc11 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IWorkSerializer.java
@@ -12,7 +12,9 @@ package org.eclipse.net4j.util.concurrent;
/**
* @author Eike Stepper
+ * @deprecated As of 3.9 use an executor such as {@link SerializingExecutor}.
*/
+@Deprecated
public interface IWorkSerializer
{
public boolean addWork(Runnable work);
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLockManager.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLockManager.java
index 89c049dcec..bed079dd41 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLockManager.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLockManager.java
@@ -32,7 +32,7 @@ import java.util.Set;
*
* @author Simon McDuff
* @since 2.0
- * @deprecated Use {@link RWOLockManager}
+ * @deprecated As of 3.2 use {@link RWOLockManager}.
*/
@Deprecated
public class RWLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWLockManager<OBJECT, CONTEXT>
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWOLockManager.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWOLockManager.java
index e84f07ecb0..8ac82be98d 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWOLockManager.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWOLockManager.java
@@ -42,7 +42,9 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
private static final ThreadLocal<Boolean> UNLOCK_ALL = new ThreadLocal<Boolean>();
- private final List<LockState<OBJECT, CONTEXT>> EMPTY_RESULT = Collections.emptyList();
+ private static final LockType[] ALL_LOCK_TYPES = LockType.values();
+
+ private final List<LockState<OBJECT, CONTEXT>> emptyResult = Collections.emptyList();
private final Map<OBJECT, LockState<OBJECT, CONTEXT>> objectToLockStateMap = createObjectToLocksMap();
@@ -63,9 +65,10 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
public List<LockState<OBJECT, CONTEXT>> lock2(LockType type, CONTEXT context, Collection<? extends OBJECT> objectsToLock, long timeout)
throws InterruptedException
{
- if (objectsToLock.isEmpty())
+ int count = objectsToLock.size();
+ if (count == 0)
{
- return EMPTY_RESULT;
+ return emptyResult;
}
if (TRACER.isEnabled())
@@ -79,8 +82,6 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
// Do not synchronize the entire method as it would corrupt the timeout!
synchronized (this)
{
- int count = objectsToLock.size();
-
for (;;)
{
ArrayList<LockState<OBJECT, CONTEXT>> lockStates = getLockStatesForContext(type, context, objectsToLock);
@@ -112,12 +113,12 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
unlock2(type, context, objectsToUnlock);
}
- public synchronized List<LockState<OBJECT, CONTEXT>> unlock2(CONTEXT context, Collection<? extends OBJECT> objectsToUnlock)
+ public List<LockState<OBJECT, CONTEXT>> unlock2(CONTEXT context, Collection<? extends OBJECT> objectsToUnlock)
{
- return unlock2(LockType.values(), context, objectsToUnlock);
+ return unlock2(ALL_LOCK_TYPES, context, objectsToUnlock);
}
- public synchronized List<LockState<OBJECT, CONTEXT>> unlock2(LockType type, CONTEXT context, Collection<? extends OBJECT> objectsToUnlock)
+ public List<LockState<OBJECT, CONTEXT>> unlock2(LockType type, CONTEXT context, Collection<? extends OBJECT> objectsToUnlock)
{
return unlock2(new LockType[] { type }, context, objectsToUnlock);
}
@@ -126,7 +127,7 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
{
if (objectsToUnlock.isEmpty())
{
- return EMPTY_RESULT;
+ return emptyResult;
}
if (TRACER.isEnabled())
@@ -135,42 +136,46 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
}
Set<LockState<OBJECT, CONTEXT>> result = new HashSet<LockState<OBJECT, CONTEXT>>();
+ boolean unlockAll = UNLOCK_ALL.get() == Boolean.TRUE;
- for (OBJECT o : objectsToUnlock)
+ synchronized (this)
{
- LockState<OBJECT, CONTEXT> lockState = objectToLockStateMap.get(o);
- if (lockState != null)
+ for (OBJECT o : objectsToUnlock)
{
- for (LockType type : types)
+ LockState<OBJECT, CONTEXT> lockState = objectToLockStateMap.get(o);
+ if (lockState != null)
{
- while (lockState.canUnlock(type, context))
+ for (LockType type : types)
{
- lockState.unlock(type, context);
- result.add(lockState);
-
- if (UNLOCK_ALL.get() != Boolean.TRUE)
+ while (lockState.canUnlock(type, context))
{
- break;
+ lockState.unlock(type, context);
+ result.add(lockState);
+
+ if (!unlockAll)
+ {
+ break;
+ }
}
}
}
}
- }
- for (LockState<OBJECT, CONTEXT> lockState : result)
- {
- if (!lockState.hasLocks(context))
+ for (LockState<OBJECT, CONTEXT> lockState : result)
{
- removeLockStateForContext(context, lockState);
- }
+ if (!lockState.hasLocks(context))
+ {
+ removeLockStateForContext(context, lockState);
+ }
- if (lockState.hasNoLocks())
- {
- objectToLockStateMap.remove(lockState.getLockedObject());
+ if (lockState.hasNoLocks())
+ {
+ objectToLockStateMap.remove(lockState.getLockedObject());
+ }
}
- }
- notifyAll();
+ notifyAll();
+ }
return new LinkedList<RWOLockManager.LockState<OBJECT, CONTEXT>>(result);
}
@@ -185,7 +190,7 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
Set<LockState<OBJECT, CONTEXT>> lockStates = contextToLockStates.get(context);
if (lockStates == null)
{
- return EMPTY_RESULT;
+ return emptyResult;
}
if (TRACER.isEnabled())
@@ -197,7 +202,7 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
for (LockState<OBJECT, CONTEXT> lockState : lockStates)
{
- for (LockType type : LockType.values())
+ for (LockType type : ALL_LOCK_TYPES)
{
while (lockState.hasLock(type, context, false))
{
@@ -225,23 +230,6 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
return toList(lockStates);
}
- @SuppressWarnings("unchecked")
- private List<LockState<OBJECT, CONTEXT>> toList(Set<LockState<OBJECT, CONTEXT>> lockStates)
- {
- if (lockStates instanceof List)
- {
- return (List<LockState<OBJECT, CONTEXT>>)lockStates;
- }
-
- List<LockState<OBJECT, CONTEXT>> list = new LinkedList<LockState<OBJECT, CONTEXT>>();
- for (LockState<OBJECT, CONTEXT> lockState : lockStates)
- {
- list.add(lockState);
- }
-
- return list;
- }
-
public synchronized boolean hasLock(LockType type, CONTEXT context, OBJECT objectToLock)
{
LockState<OBJECT, CONTEXT> lockState = objectToLockStateMap.get(objectToLock);
@@ -348,9 +336,12 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
private ArrayList<LockState<OBJECT, CONTEXT>> getLockStatesForContext(LockType type, CONTEXT context, Collection<? extends OBJECT> objectsToLock)
{
- ArrayList<LockState<OBJECT, CONTEXT>> lockStates = new ArrayList<LockState<OBJECT, CONTEXT>>(objectsToLock.size());
+ int count = objectsToLock.size();
+ ArrayList<LockState<OBJECT, CONTEXT>> lockStates = new ArrayList<LockState<OBJECT, CONTEXT>>(count);
+
Iterator<? extends OBJECT> it = objectsToLock.iterator();
- for (int i = 0; i < objectsToLock.size(); i++)
+
+ for (int i = 0; i < count; i++)
{
OBJECT o = it.next();
LockState<OBJECT, CONTEXT> lockState = getOrCreateLockState(o);
@@ -380,7 +371,7 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
/**
* Removes a lockState from the set of all lockStates that the given context is involved in. If the lockState being
* removed is the last one for the given context, then the set becomes empty, and is therefore removed from the
- * contextToLockStates mp.
+ * contextToLockStates map.
*/
private void removeLockStateForContext(CONTEXT context, LockState<OBJECT, CONTEXT> lockState)
{
@@ -409,7 +400,23 @@ public class RWOLockManager<OBJECT, CONTEXT> extends Lifecycle implements IRWOLo
wait(waitTime);
}
+ }
+ @SuppressWarnings("unchecked")
+ private static <OBJECT, CONTEXT> List<LockState<OBJECT, CONTEXT>> toList(Set<LockState<OBJECT, CONTEXT>> lockStates)
+ {
+ if (lockStates instanceof List)
+ {
+ return (List<LockState<OBJECT, CONTEXT>>)lockStates;
+ }
+
+ List<LockState<OBJECT, CONTEXT>> list = new LinkedList<LockState<OBJECT, CONTEXT>>();
+ for (LockState<OBJECT, CONTEXT> lockState : lockStates)
+ {
+ list.add(lockState);
+ }
+
+ return list;
}
/**
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java
index b3636496ce..cd47276207 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java
@@ -19,11 +19,11 @@ import org.eclipse.net4j.util.om.OMPlatform;
*/
public abstract class RunnableWithName implements Runnable
{
- private static final boolean DISABLE_RUNNABLE_NAMES = OMPlatform.INSTANCE.isProperty("org.eclipse.net4j.util.concurrent.DISABLE_RUNNABLE_NAMES");
+ private static final boolean ENABLE_RUNNABLE_NAMES = OMPlatform.INSTANCE.isProperty("org.eclipse.net4j.util.concurrent.ENABLE_RUNNABLE_NAMES");
public final void run()
{
- if (DISABLE_RUNNABLE_NAMES)
+ if (!ENABLE_RUNNABLE_NAMES)
{
doRun();
return;
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SerializingExecutor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SerializingExecutor.java
new file mode 100644
index 0000000000..e3a64774ee
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SerializingExecutor.java
@@ -0,0 +1,141 @@
+package org.eclipse.net4j.util.concurrent;
+
+import org.eclipse.net4j.internal.util.bundle.OM;
+import org.eclipse.net4j.util.lifecycle.Lifecycle;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Eike Stepper
+ * @since 3.9
+ */
+public class SerializingExecutor extends Lifecycle implements Executor, Runnable
+{
+ private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
+
+ private final AtomicBoolean running = new AtomicBoolean();
+
+ private Executor delegate;
+
+ public SerializingExecutor(Executor delegate)
+ {
+ setDelegate(delegate);
+ }
+
+ public SerializingExecutor()
+ {
+ this(null);
+ }
+
+ public final Executor getDelegate()
+ {
+ return delegate;
+ }
+
+ public final void setDelegate(Executor delegate)
+ {
+ checkInactive();
+ this.delegate = delegate == null ? SynchronousExecutor.INSTANCE : delegate;
+ }
+
+ public final void execute(Runnable task)
+ {
+ tasks.add(task);
+
+ if (isActive())
+ {
+ schedule(task);
+ }
+ }
+
+ public final void run()
+ {
+ Runnable task;
+
+ try
+ {
+ while ((task = tasks.poll()) != null)
+ {
+ if (!isActive())
+ {
+ // Bypass trySchedule() below.
+ return;
+ }
+
+ try
+ {
+ task.run();
+ }
+ catch (RuntimeException ex)
+ {
+ handleFailedTask(task, ex);
+ }
+ }
+ }
+ finally
+ {
+ running.set(false);
+ }
+
+ trySchedule();
+ }
+
+ protected void handleFailedTask(Runnable task, Throwable failure)
+ {
+ OM.LOG.error("Execution of task failed: " + task, failure);
+ }
+
+ @Override
+ protected void doActivate() throws Exception
+ {
+ super.doActivate();
+ trySchedule();
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ tasks.clear();
+ super.doDeactivate();
+ }
+
+ private void trySchedule()
+ {
+ if (!tasks.isEmpty())
+ {
+ schedule(null);
+ }
+ }
+
+ private void schedule(Runnable task)
+ {
+ if (running.compareAndSet(false, true))
+ {
+ try
+ {
+ delegate.execute(this);
+ }
+ catch (RuntimeException ex)
+ {
+ cleanup(task);
+ }
+ catch (Error ex)
+ {
+ cleanup(task);
+ }
+ }
+ }
+
+ private void cleanup(Runnable task)
+ {
+ if (task != null)
+ {
+ tasks.remove(task);
+ }
+
+ running.set(false);
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Sleeper.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Sleeper.java
index 6a9e0c6309..5fa757050d 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Sleeper.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Sleeper.java
@@ -12,7 +12,9 @@ package org.eclipse.net4j.util.concurrent;
/**
* @author Eike Stepper
+ * @deprecated As of 3.9 subject to future removal.
*/
+@Deprecated
public class Sleeper
{
private static final int DEFAULT_INTERVAL = 10;
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousExecutor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousExecutor.java
new file mode 100644
index 0000000000..f152c7bc9c
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2004-2018 Eike Stepper (Loehne, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.util.concurrent;
+
+import java.util.concurrent.Executor;
+
+/**
+ * @author Eike Stepper
+ * @since 3.9
+ */
+public final class SynchronousExecutor implements Executor
+{
+ public static final Executor INSTANCE = new SynchronousExecutor();
+
+ private static final String NAME = SynchronousExecutor.class.getSimpleName() + ".INSTANCE";
+
+ private SynchronousExecutor()
+ {
+ }
+
+ public void execute(Runnable work)
+ {
+ work.run();
+ }
+
+ @Override
+ public String toString()
+ {
+ return NAME;
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java
index 35d2838c83..22d4e37cb0 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/SynchronousWorkSerializer.java
@@ -12,7 +12,9 @@ package org.eclipse.net4j.util.concurrent;
/**
* @author Eike Stepper
+ * @deprecated As of 3.9 use an executor such as {@link SynchronousExecutor}.
*/
+@Deprecated
public class SynchronousWorkSerializer implements IWorkSerializer
{
public SynchronousWorkSerializer()
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java
index f860b5e1cf..98a24cc68c 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java
@@ -10,15 +10,20 @@
*/
package org.eclipse.net4j.util.concurrent;
+import org.eclipse.net4j.internal.util.bundle.OM;
import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.om.OMPlatform;
import java.lang.reflect.Method;
import java.util.AbstractQueue;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -40,18 +45,154 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
private static final Class<?> LINKED_BLOCKING_DEQUE_CLASS;
- private static final Method ADD_FIRST_METHOD;
+ private static final Method OFFER_LAST_METHOD;
+
+ private static final int NO_DEADLOCK_DETECTION = 0;
+
+ private static final int deadlockDetectionInterval = OMPlatform.INSTANCE.getProperty("org.eclipse.net4j.util.concurrent.ThreadPool.deadlockDetectionInterval",
+ NO_DEADLOCK_DETECTION);
+
+ private final AtomicInteger runningTasks = new AtomicInteger();
+
+ private final AtomicInteger runTasks = new AtomicInteger();
+
+ private int lastRunTasks = -1;
+
+ private RejectedExecutionHandler userHandler;
public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, ThreadFactory threadFactory)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, createWorkQueue(), threadFactory);
((WorkQueue)getQueue()).setThreadPool(this);
- setRejectedExecutionHandler(this);
+
+ // Call super setter because the setter in this class is overridden to set the userHandler field.
+ super.setRejectedExecutionHandler(this);
+
+ if (deadlockDetectionInterval != NO_DEADLOCK_DETECTION)
+ {
+ DeadlockDetector.INSTANCE.register(this);
+ }
+ }
+
+ @Override
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
+ {
+ userHandler = handler;
+ }
+
+ @Override
+ public RejectedExecutionHandler getRejectedExecutionHandler()
+ {
+ return userHandler;
+ }
+
+ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
+ {
+ WorkQueue queue = (WorkQueue)getQueue();
+ if (!queue.offerLast(task))
+ {
+ if (userHandler != null)
+ {
+ userHandler.rejectedExecution(task, this);
+ }
+ else
+ {
+ OM.LOG.error("Thread pool has rejected the task " + task);
+ }
+ }
}
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
+ @Override
+ public int getActiveCount()
{
- ((WorkQueue)getQueue()).addFirst(runnable);
+ return runningTasks.get();
+ }
+
+ @Override
+ protected void beforeExecute(Thread worker, Runnable task)
+ {
+ runningTasks.incrementAndGet();
+ incrementRunTasks();
+ }
+
+ @Override
+ protected void afterExecute(Runnable task, Throwable ex)
+ {
+ runningTasks.decrementAndGet();
+ }
+
+ /**
+ * @since 3.9
+ */
+ protected void potentialDeadlockDetected()
+ {
+ BlockingQueue<Runnable> queue = getQueue();
+ int size = queue.size();
+ if (size > 0)
+ {
+ String poolName = toString();
+
+ ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>());
+ Runnable task;
+ boolean first = true;
+
+ while ((task = queue.poll()) != null)
+ {
+ if (first)
+ {
+ OM.LOG.warn("Potential deadlock detected in " + poolName + ". Executing " + size + " tasks...");
+ first = false;
+ }
+
+ incrementRunTasks();
+ executor.execute(task);
+ }
+ }
+ }
+
+ private void incrementRunTasks()
+ {
+ int current;
+ int next;
+
+ do
+ {
+ current = runTasks.get();
+ next = current == Integer.MAX_VALUE ? 0 : current + 1;
+ } while (!runTasks.compareAndSet(current, next));
+ }
+
+ /**
+ * This method decides whether a new task will be added to the {@link WorkQueue} (and eventually picked up by
+ * an existing worker), or assigned to a new worker.
+ * <p>
+ * It is called from {@link WorkQueue#offer(Runnable)}, which, in turn, is called from {@link #execute(Runnable)}.
+ * When this method is called the core workers are already created, i.e., {@link #getPoolSize() pool size} >=
+ * {@link #getCorePoolSize() core pool size}.
+ * <p>
+ * Note that, due to the unsynchronized calls to the various metric-providing methods,
+ * it can happen that the thread pool will not be able to actually create a new worker at the time it is supposed
+ * to do it. In this case the {@link #rejectedExecution(Runnable, ThreadPoolExecutor) rejectedExecution()} method
+ * will be called, which, as a last resort, adds the new task to the work queue (even though here
+ * it was decided not to do so).
+ */
+ private boolean shallEnqueue()
+ {
+ int poolSize = getPoolSize();
+ if (getQueue().size() < poolSize - getActiveCount())
+ {
+ // More inactive workers exist than there are tasks in the queue; the task should be enqueued.
+ return true;
+ }
+
+ if (poolSize >= getMaximumPoolSize())
+ {
+ // Pool is full; the task should be enqueued.
+ return true;
+ }
+
+ // A new worker should be created.
+ return false;
}
public static ThreadPool create()
@@ -132,9 +273,9 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
{
private final AtomicInteger num = new AtomicInteger();
- public Thread newThread(Runnable r)
+ public Thread newThread(Runnable task)
{
- Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-thread-" + num.incrementAndGet());
+ Thread thread = new Thread(threadGroup, task, threadGroup.getName() + "-thread-" + num.incrementAndGet());
thread.setDaemon(true);
return thread;
}
@@ -168,7 +309,7 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
try
{
c = Class.forName("java.util.concurrent.LinkedBlockingDeque");
- m = c.getMethod("addFirst", Object.class);
+ m = c.getMethod("offerLast", Object.class);
}
catch (Throwable ex)
{
@@ -177,7 +318,7 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
}
LINKED_BLOCKING_DEQUE_CLASS = c;
- ADD_FIRST_METHOD = m;
+ OFFER_LAST_METHOD = m;
}
/**
@@ -187,7 +328,8 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
{
public void setThreadPool(ThreadPool threadPool);
- public void addFirst(Runnable runnable);
+ public boolean offerLast(Runnable task);
+
}
/**
@@ -208,23 +350,21 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
this.threadPool = threadPool;
}
- public void addFirst(Runnable runnable)
+ public boolean offerLast(Runnable task)
{
- super.offer(runnable);
+ // Call the super method because the method in this class is overridden.
+ return super.offer(task);
}
@Override
- public boolean offer(Runnable runnable)
+ public boolean offer(Runnable task)
{
- int poolSize = threadPool.getPoolSize();
- if (poolSize < threadPool.getMaximumPoolSize() && poolSize == threadPool.getActiveCount())
+ if (threadPool.shallEnqueue())
{
- // Do not enqueue the new runnable command if we can create a new worker thread and there's currently no idle
- // thread.
- return false;
+ return super.offer(task);
}
- return super.offer(runnable);
+ return false;
}
}
@@ -246,31 +386,33 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
this.threadPool = threadPool;
}
- public void addFirst(Runnable runnable)
+ public boolean offerLast(Runnable task)
{
try
{
- ADD_FIRST_METHOD.invoke(delegate, runnable);
+ // Call the LinkedBlockingDeque.offerLast() method because it does NOT call
+ // the overridden offer() method in this class.
+ return (Boolean)OFFER_LAST_METHOD.invoke(delegate, task);
}
catch (Throwable ex)
{
- //$FALL-THROUGH$
+ return false;
}
-
- delegate.offer(runnable);
}
- public boolean offer(Runnable r)
+ public boolean offer(Runnable task)
{
- int poolSize = threadPool.getPoolSize();
- if (poolSize < threadPool.getMaximumPoolSize() && poolSize == threadPool.getActiveCount())
+ if (threadPool.shallEnqueue())
{
- // Do not enqueue the new runnable command if we can create a new worker thread and there's currently no idle
- // thread.
- return false;
+ return delegate.offer(task);
}
- return delegate.offer(r);
+ return false;
+ }
+
+ public boolean offer(Runnable taske, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return delegate.offer(taske, timeout, unit);
}
@Override
@@ -279,6 +421,16 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
return delegate.size();
}
+ public Runnable take() throws InterruptedException
+ {
+ return delegate.take();
+ }
+
+ public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return delegate.poll(timeout, unit);
+ }
+
public Runnable poll()
{
return delegate.poll();
@@ -295,24 +447,9 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
return delegate.peek();
}
- public void put(Runnable e) throws InterruptedException
+ public void put(Runnable task) throws InterruptedException
{
- delegate.put(e);
- }
-
- public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException
- {
- return delegate.offer(e, timeout, unit);
- }
-
- public Runnable take() throws InterruptedException
- {
- return delegate.take();
- }
-
- public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException
- {
- return delegate.poll(timeout, unit);
+ delegate.put(task);
}
public int remainingCapacity()
@@ -345,4 +482,77 @@ public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionH
return new LinkedBlockingQueue<Runnable>();
}
}
+
+ /**
+ * @author Eike Stepper
+ */
+ private static final class DeadlockDetector extends Worker
+ {
+ public static final DeadlockDetector INSTANCE = new DeadlockDetector();
+
+ private volatile ArrayList<ThreadPool> pools = new ArrayList<ThreadPool>();
+
+ private DeadlockDetector()
+ {
+ setDaemon(true);
+ activate();
+ }
+
+ public void register(ThreadPool pool)
+ {
+ ArrayList<ThreadPool> newList = new ArrayList<ThreadPool>(pools);
+ newList.add(pool);
+ pools = newList;
+ }
+
+ private void unregister(ThreadPool pool)
+ {
+ ArrayList<ThreadPool> newList = new ArrayList<ThreadPool>(pools);
+ newList.remove(pool);
+ pools = newList;
+ }
+
+ @Override
+ protected String getThreadName()
+ {
+ return DeadlockDetector.class.getSimpleName();
+ }
+
+ @Override
+ protected void work(WorkContext context) throws Exception
+ {
+ ArrayList<ThreadPool> list = pools;
+ int size = list.size();
+
+ for (int i = 0; i < size; i++)
+ {
+ ThreadPool pool = list.get(i);
+ if (pool.isShutdown())
+ {
+ unregister(pool);
+ continue;
+ }
+
+ work(pool);
+ }
+
+ context.nextWork(deadlockDetectionInterval);
+ }
+
+ private void work(ThreadPool pool)
+ {
+ int lastRunTasks = pool.runTasks.get();
+ if (lastRunTasks != pool.lastRunTasks)
+ {
+ pool.lastRunTasks = lastRunTasks;
+ }
+ else
+ {
+ if (pool.getPoolSize() == pool.getMaximumPoolSize())
+ {
+ pool.potentialDeadlockDetected();
+ }
+ }
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j/.settings/.api_filters b/plugins/org.eclipse.net4j/.settings/.api_filters
index a2d1ab5b6b..1af03789e4 100644
--- a/plugins/org.eclipse.net4j/.settings/.api_filters
+++ b/plugins/org.eclipse.net4j/.settings/.api_filters
@@ -32,6 +32,15 @@
</message_arguments>
</filter>
</resource>
+ <resource path="src/org/eclipse/net4j/buffer/IBuffer.java" type="org.eclipse.net4j.buffer.IBuffer">
+ <filter id="389242988">
+ <message_arguments>
+ <message_argument value="org.eclipse.net4j.buffer.IBuffer"/>
+ <message_argument value="HEADER_SIZE"/>
+ <message_argument value="4"/>
+ </message_arguments>
+ </filter>
+ </resource>
<resource path="src/org/eclipse/net4j/connector/IConnector.java" type="org.eclipse.net4j.connector.IConnector">
<filter id="571473929">
<message_arguments>
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
index 2fe2a2bea0..974a05cf28 100644
--- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.net4j;singleton:=true
-Bundle-Version: 4.7.100.qualifier
+Bundle-Version: 4.8.0.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -11,7 +11,7 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5
Bundle-ClassPath: .
Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional,
org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport
-Export-Package: org.eclipse.internal.net4j;version="4.7.100";
+Export-Package: org.eclipse.internal.net4j;version="4.8.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -19,7 +19,7 @@ Export-Package: org.eclipse.internal.net4j;version="4.7.100";
org.eclipse.net4j.http.common,
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests",
- org.eclipse.internal.net4j.buffer;version="4.7.100";
+ org.eclipse.internal.net4j.buffer;version="4.8.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -28,18 +28,18 @@ Export-Package: org.eclipse.internal.net4j;version="4.7.100";
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests,
org.eclipse.net4j.trace",
- org.eclipse.internal.net4j.bundle;version="4.7.100";x-internal:=true,
- org.eclipse.net4j;version="4.7.100",
- org.eclipse.net4j.acceptor;version="4.7.100",
- org.eclipse.net4j.buffer;version="4.7.100",
- org.eclipse.net4j.channel;version="4.7.100",
- org.eclipse.net4j.connector;version="4.7.100",
- org.eclipse.net4j.protocol;version="4.7.100",
- org.eclipse.net4j.signal;version="4.7.100",
- org.eclipse.net4j.signal.confirmation;version="4.7.100",
- org.eclipse.net4j.signal.heartbeat;version="4.7.100",
- org.eclipse.net4j.signal.security;version="4.7.100",
- org.eclipse.net4j.signal.wrapping;version="4.7.100",
- org.eclipse.spi.net4j;version="4.7.100"
+ org.eclipse.internal.net4j.bundle;version="4.8.0";x-internal:=true,
+ org.eclipse.net4j;version="4.8.0",
+ org.eclipse.net4j.acceptor;version="4.8.0",
+ org.eclipse.net4j.buffer;version="4.8.0",
+ org.eclipse.net4j.channel;version="4.8.0",
+ org.eclipse.net4j.connector;version="4.8.0",
+ org.eclipse.net4j.protocol;version="4.8.0",
+ org.eclipse.net4j.signal;version="4.8.0",
+ org.eclipse.net4j.signal.confirmation;version="4.8.0",
+ org.eclipse.net4j.signal.heartbeat;version="4.8.0",
+ org.eclipse.net4j.signal.security;version="4.8.0",
+ org.eclipse.net4j.signal.wrapping;version="4.8.0",
+ org.eclipse.spi.net4j;version="4.8.0"
Eclipse-BuddyPolicy: registered
Automatic-Module-Name: org.eclipse.net4j
diff --git a/plugins/org.eclipse.net4j/pom.xml b/plugins/org.eclipse.net4j/pom.xml
index c6b9b5932e..ae00c6a5e3 100644
--- a/plugins/org.eclipse.net4j/pom.xml
+++ b/plugins/org.eclipse.net4j/pom.xml
@@ -25,7 +25,7 @@
<groupId>org.eclipse.emf.cdo</groupId>
<artifactId>org.eclipse.net4j</artifactId>
- <version>4.7.100-SNAPSHOT</version>
+ <version>4.8.0-SNAPSHOT</version>
<packaging>eclipse-plugin</packaging>
</project>
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
index b7ddf75411..cd65d47f25 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java
@@ -12,7 +12,6 @@
package org.eclipse.internal.net4j.buffer;
import org.eclipse.net4j.buffer.BufferState;
-import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.util.HexUtil;
import org.eclipse.net4j.util.IErrorHandler;
@@ -196,7 +195,7 @@ public class Buffer implements InternalBuffer
if (state == BufferState.INITIAL)
{
- byteBuffer.limit(IBuffer.HEADER_SIZE);
+ byteBuffer.limit(HEADER_SIZE);
state = BufferState.READING_HEADER;
}
@@ -298,7 +297,7 @@ public class Buffer implements InternalBuffer
this.channelID = channelID;
byteBuffer.clear();
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ byteBuffer.position(HEADER_SIZE);
}
return byteBuffer;
@@ -340,7 +339,7 @@ public class Buffer implements InternalBuffer
throw new IllegalStateException(toString() + ": channelID == NO_CHANNEL"); //$NON-NLS-1$
}
- int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + MAKE_PAYLOAD_SIZE_NON_ZERO;
+ int payloadSize = byteBuffer.position() - HEADER_SIZE + MAKE_PAYLOAD_SIZE_NON_ZERO;
boolean eos = isEOS();
if (eos)
@@ -402,7 +401,7 @@ public class Buffer implements InternalBuffer
}
byteBuffer.flip();
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ byteBuffer.position(HEADER_SIZE);
state = BufferState.GETTING;
}
catch (RuntimeException ex)
@@ -508,7 +507,7 @@ public class Buffer implements InternalBuffer
if (state == BufferState.PUTTING && !showHeader)
{
- byteBuffer.position(IBuffer.HEADER_SIZE);
+ byteBuffer.position(HEADER_SIZE);
}
StringBuilder builder = new StringBuilder();
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
index 363e27883d..91de7d4f95 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java
@@ -115,7 +115,7 @@ public class BufferInputStream extends InputStream implements IBufferHandler
public void handleBuffer(IBuffer buffer)
{
- // Stream has been closed - ignore the new buffer
+ // If stream has been closed - ignore the new buffer.
if (buffers != null)
{
buffers.add(buffer);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
index 7f7d5f3877..765f24d091 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java
@@ -87,7 +87,17 @@ public interface IBuffer
*/
public static final short MAX_CHANNEL = Short.MAX_VALUE;
- public static final short HEADER_SIZE = 4;
+ /**
+ * @since 4.8
+ */
+ public static final int CHANNEL_ID_BYTES = Short.BYTES;
+
+ /**
+ * @since 4.8
+ */
+ public static final int PAYLOAD_SIZE_BYTES = Short.BYTES;
+
+ public static final short HEADER_SIZE = CHANNEL_ID_BYTES + PAYLOAD_SIZE_BYTES;
/**
* Returns the {@link IBufferProvider} that has provided this buffer and that this buffer will be returned to when its
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java
index 8e839e1b35..9d6b959251 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java
@@ -14,8 +14,6 @@ import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
-import org.eclipse.spi.net4j.InternalChannel;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
@@ -79,7 +77,7 @@ public class ChannelInputStream extends BufferInputStream
{
if (isCCAM())
{
- ExecutorService executorService = ((InternalChannel)channel).getReceiveExecutor();
+ ExecutorService executorService = ConcurrencyUtil.getExecutorService(channel);
executorService.submit(new Runnable()
{
public void run()
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
index 3fb4ad76da..bb2e62e529 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
import org.eclipse.net4j.util.ImplementationError;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.RunnableWithName;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.om.monitor.Monitor;
@@ -32,6 +33,8 @@ import java.util.concurrent.Future;
*/
public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT>
{
+ private static final String MAIN_MONITOR_NAME = RequestWithMonitoring.class.getSimpleName() + "-MainMonitor";
+
/**
* @since 2.0
*/
@@ -47,7 +50,7 @@ public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmat
*/
public static final int DEFAULT_MONITOR_TIMEOUT_SECONDS = 10;
- private OMMonitor mainMonitor;
+ private volatile OMMonitor mainMonitor;
private OMMonitor remoteMonitor;
@@ -137,18 +140,29 @@ public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmat
ExecutorService executorService = getCancelationExecutorService();
if (executorService != null)
{
- executorService.execute(new Runnable()
+ executorService.execute(new RunnableWithName()
{
- public void run()
+ @Override
+ public String getName()
{
+ return MAIN_MONITOR_NAME;
+ }
+
+ @Override
+ protected void doRun()
+ {
+ SignalProtocol<?> protocol = getProtocol();
+ int correlationID = getCorrelationID();
+ long cancelationPollInterval = getCancelationPollInterval();
+
while (mainMonitor != null)
{
- ConcurrencyUtil.sleep(getCancelationPollInterval());
+ ConcurrencyUtil.sleep(cancelationPollInterval);
if (mainMonitor != null && mainMonitor.isCanceled())
{
try
{
- new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync();
+ new MonitorCanceledRequest(protocol, correlationID).sendAsync();
}
catch (Exception ex)
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java
index 8dbedaffce..9f106f2464 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java
@@ -14,6 +14,11 @@ import org.eclipse.net4j.util.collection.HashBag;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.event.IListener;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Provides {@link Signal signal} execution counts when
* {@link SignalProtocol#addListener(IListener) attached} to a {@link ISignalProtocol signal protocol}.
@@ -23,6 +28,8 @@ import org.eclipse.net4j.util.event.IListener;
*/
public final class SignalCounter implements IListener
{
+ private static final boolean fullyQualifiedNames = Boolean.getBoolean("org.eclipse.net4j.signal.SignalCounter.fullyQualifiedNames");
+
private HashBag<Class<? extends Signal>> signals = new HashBag<Class<? extends Signal>>();
private final ISignalProtocol<?> protocol;
@@ -85,6 +92,37 @@ public final class SignalCounter implements IListener
}
}
+ /**
+ * @since 4.8
+ */
+ public void dump(PrintStream out, boolean clearCountsWhenDone)
+ {
+ synchronized (signals)
+ {
+ Map<String, Class<? extends Signal>> signalTypes = new HashMap<String, Class<? extends Signal>>();
+
+ for (Class<? extends Signal> signalType : signals)
+ {
+ String name = fullyQualifiedNames ? signalType.getName() : signalType.getSimpleName();
+ signalTypes.put(name, signalType);
+ }
+
+ String[] names = signalTypes.keySet().toArray(new String[signalTypes.size()]);
+ Arrays.sort(names);
+
+ for (String name : names)
+ {
+ Class<? extends Signal> signalType = signalTypes.get(name);
+ out.println(name + " = " + signals.getCounterFor(signalType));
+ }
+
+ if (clearCountsWhenDone)
+ {
+ clearCounts();
+ }
+ }
+ }
+
public void notifyEvent(IEvent event)
{
if (event instanceof SignalFinishedEvent)
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
index 70542eac7b..e993794e62 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java
@@ -16,11 +16,10 @@ import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferHandler;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
-import org.eclipse.net4j.util.concurrent.IWorkSerializer;
import org.eclipse.net4j.util.concurrent.RunnableWithName;
-import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
+import org.eclipse.net4j.util.concurrent.SerializingExecutor;
import org.eclipse.net4j.util.event.Event;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
@@ -55,15 +54,13 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
private short id = IBuffer.NO_CHANNEL;
- private ExecutorService receiveExecutor;
+ private final Executor receiveSerializer = new SerializingExecutor();
/**
* The external handler for buffers passed from the {@link #connector}.
*/
private IBufferHandler receiveHandler;
- private IWorkSerializer receiveSerializer;
-
private transient Queue<IBuffer> sendQueue;
private transient long sentBuffers;
@@ -125,17 +122,19 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
*/
public ExecutorService getExecutorService()
{
- return receiveExecutor;
+ return ConcurrencyUtil.getExecutorService(channelMultiplexer);
}
+ @Deprecated
public ExecutorService getReceiveExecutor()
{
- return receiveExecutor;
+ return null;
}
+ @Deprecated
public void setReceiveExecutor(ExecutorService receiveExecutor)
{
- this.receiveExecutor = receiveExecutor;
+ // Do nothing.
}
public IBufferHandler getReceiveHandler()
@@ -217,8 +216,6 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
* Handles a buffer sent by the multiplexer. Adds work to the receive queue or releases the buffer.
*
* @see InternalChannelMultiplexer#multiplexChannel
- * @see IWorkSerializer
- * @see ReceiverWork
*/
public void handleBufferFromMultiplexer(IBuffer buffer)
{
@@ -232,7 +229,7 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
++receivedBuffers;
ReceiverWork receiverWork = createReceiverWork(buffer);
- receiveSerializer.addWork(receiverWork);
+ receiveSerializer.execute(receiverWork);
}
else
{
@@ -286,26 +283,14 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
{
super.doActivate();
sendQueue = new SendQueue();
- if (receiveExecutor != null)
- {
- receiveSerializer = new ReceiveSerializer2(receiveExecutor);
- LifecycleUtil.activate(receiveSerializer);
- }
- else
- {
- receiveSerializer = new SynchronousWorkSerializer();
- }
+ LifecycleUtil.activate(receiveSerializer);
}
@Override
protected void doDeactivate() throws Exception
{
unregisterFromMultiplexer();
- if (receiveSerializer != null)
- {
- receiveSerializer.dispose();
- receiveSerializer = null;
- }
+ LifecycleUtil.deactivate(receiveSerializer);
if (sendQueue != null)
{
@@ -332,11 +317,9 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
}
/**
- * If the meaning of this type isn't clear, there really should be more of a description here...
- *
* @author Eike Stepper
* @since 4.1
- * @deprecated As of 4.4 use {@link ExecutorWorkSerializer}.
+ * @deprecated As of 4.4 scheduled for future removal.
*/
@Deprecated
protected class ReceiveSerializer extends org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer
@@ -361,29 +344,6 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
* If the meaning of this type isn't clear, there really should be more of a description here...
*
* @author Eike Stepper
- * @since 4.4
- */
- private class ReceiveSerializer2 extends ExecutorWorkSerializer
- {
- public ReceiveSerializer2(Executor executor)
- {
- super(executor);
- }
-
- @Override
- protected void noWork()
- {
- if (isClosed())
- {
- dispose();
- }
- }
- }
-
- /**
- * If the meaning of this type isn't clear, there really should be more of a description here...
- *
- * @author Eike Stepper
*/
protected class ReceiverWork extends RunnableWithName
{
@@ -403,7 +363,7 @@ public class Channel extends Lifecycle implements InternalChannel, IExecutorServ
@Override
public String getName()
{
- return "Net4jReceiveSerializer-" + Channel.this; //$NON-NLS-1$
+ return "Net4jReceiver-" + Channel.this; //$NON-NLS-1$
}
@Override
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
index 1755eeba4c..70defaa3a1 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java
@@ -22,6 +22,8 @@ import org.eclipse.net4j.protocol.IProtocolProvider;
import org.eclipse.net4j.protocol.ProtocolVersionException;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
+import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.factory.FactoryKey;
@@ -40,6 +42,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
/**
* If the meaning of this type isn't clear, there really should be more of a description here...
@@ -47,7 +50,7 @@ import java.util.concurrent.ConcurrentMap;
* @author Eike Stepper
* @since 2.0
*/
-public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer
+public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer, IExecutorServiceProvider
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
@@ -83,6 +86,11 @@ public abstract class ChannelMultiplexer extends Container<IChannel> implements
this.config = Net4jUtil.copyTransportConfig(this, config);
}
+ public ExecutorService getExecutorService()
+ {
+ return ConcurrencyUtil.getExecutorService(config);
+ }
+
public long getOpenChannelTimeout()
{
if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT)
@@ -234,7 +242,6 @@ public abstract class ChannelMultiplexer extends Container<IChannel> implements
protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
{
channel.setMultiplexer(this);
- channel.setReceiveExecutor(getConfig().getReceiveExecutor());
if (protocol != null)
{
protocol.setChannel(channel);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
index bfa3674c1d..71574dd889 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java
@@ -37,8 +37,16 @@ public interface InternalChannel extends IChannel, IBufferProvider, ILifecycle
*/
public void setUserID(String userID);
+ /**
+ * @deprecated As of 4.8 no longer supported.
+ */
+ @Deprecated
public ExecutorService getReceiveExecutor();
+ /**
+ * @deprecated As of 4.8 no longer supported.
+ */
+ @Deprecated
public void setReceiveExecutor(ExecutorService receiveExecutor);
/**
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
index 04a73386a2..060553c08b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java
@@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.IBufferProvider;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.protocol.IProtocol2;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
+import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider;
import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.lifecycle.ILifecycle;
@@ -141,7 +142,7 @@ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IPr
if (channel != null)
{
channel.addListener(channelListener);
- executorService = ((InternalChannel)channel).getReceiveExecutor();
+ executorService = ConcurrencyUtil.getExecutorService(channel);
bufferProvider = (InternalChannel)channel;
}
}

Back to the top