Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2019-02-01 11:04:51 -0500
committerEike Stepper2019-02-01 11:04:51 -0500
commitddbf0ebab3920ef12645400a1b929d8accd97831 (patch)
tree5e384f10a49d84d3a0e50c927d444481c87616ee /plugins/org.eclipse.net4j.tests/src
parent3ff7e9362774a011ceb8e5937ab4f20e87fe27b0 (diff)
downloadcdo-ddbf0ebab3920ef12645400a1b929d8accd97831.tar.gz
cdo-ddbf0ebab3920ef12645400a1b929d8accd97831.tar.xz
cdo-ddbf0ebab3920ef12645400a1b929d8accd97831.zip
[544045] Various concurrency improvements (IWorkSerializer, ThreadPool, RWOLockManager)
https://bugs.eclipse.org/bugs/show_bug.cgi?id=544045
Diffstat (limited to 'plugins/org.eclipse.net4j.tests/src')
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java16
-rw-r--r--plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java240
2 files changed, 244 insertions, 12 deletions
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
index 0518006ff1..a7e3fda1e1 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ExecutorWorkSerializerTest.java
@@ -11,7 +11,7 @@
*/
package org.eclipse.net4j.util.tests;
-import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
+import org.eclipse.net4j.util.concurrent.SerializingExecutor;
import org.eclipse.net4j.util.io.IOUtil;
import java.util.Random;
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * A test for {@link ExecutorWorkSerializer}.
+ * A test for {@link SerializingExecutor}.
*
* @author Andre Dietisheim
*/
@@ -46,8 +46,8 @@ public class ExecutorWorkSerializerTest extends AbstractOMTest
/** The thread pool to execute the work unit producers in. */
private ExecutorService threadPool;
- /** The queue worker to submit the work units to. */
- private ExecutorWorkSerializer queueWorker;
+ /** The executor to submit the work units to. */
+ private SerializingExecutor serializer;
@Override
public void setUp()
@@ -56,14 +56,14 @@ public class ExecutorWorkSerializerTest extends AbstractOMTest
workConsumedLatch = new CountDownLatch(NUM_WORK);
threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS);
- queueWorker = new ExecutorWorkSerializer(threadPool);
- queueWorker.activate();
+ serializer = new SerializingExecutor(threadPool);
+ serializer.activate();
}
@Override
public void tearDown()
{
- queueWorker.dispose();
+ serializer.deactivate();
threadPool.shutdown();
}
@@ -165,7 +165,7 @@ public class ExecutorWorkSerializerTest extends AbstractOMTest
int currentWorkProduced;
while ((currentWorkProduced = workProduced.getAndIncrement()) < NUM_WORK)
{
- queueWorker.addWork(createWork(currentWorkProduced));
+ serializer.execute(createWork(currentWorkProduced));
Thread.sleep(random.nextInt(1000));
}
diff --git a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
index 88fcdadac8..6dacd1e72d 100644
--- a/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
+++ b/plugins/org.eclipse.net4j.tests/src/org/eclipse/net4j/util/tests/ThreadPoolTest.java
@@ -12,9 +12,14 @@ package org.eclipse.net4j.util.tests;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.concurrent.ThreadPool;
+import org.eclipse.net4j.util.tests.ThreadPoolTest.TaskManager.Task;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A test for {@link ThreadPool}.
@@ -23,9 +28,9 @@ import java.util.concurrent.TimeUnit;
*/
public class ThreadPoolTest extends AbstractOMTest
{
- public void testThreadPool() throws Exception
+ public void testExceedMaximumPoolSize() throws Exception
{
- final ThreadPool pool = ThreadPool.create("test", 100, 200, 60);
+ final ThreadPool pool = ThreadPool.create("test", 10, 20, 60);
try
{
@@ -36,7 +41,7 @@ public class ThreadPoolTest extends AbstractOMTest
{
final int n = i;
msg("scheduling " + n);
- pool.submit(new Runnable()
+ pool.execute(new Runnable()
{
public void run()
{
@@ -48,11 +53,238 @@ public class ThreadPoolTest extends AbstractOMTest
}
latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
- msg("FINISHED");
+ msg("FINISHED with largest pool size = " + pool.getLargestPoolSize());
}
finally
{
pool.shutdownNow();
}
}
+
+ public void testWithKeepAlive() throws Exception
+ {
+ runTest(1000);
+ }
+
+ public void testWithoutKeepAlive() throws Exception
+ {
+ runTest(0);
+ }
+
+ private void runTest(long keepAliveTime)
+ {
+ TaskManager taskManager = new TaskManager(10, 20, keepAliveTime);
+ int max = taskManager.getMaximumPoolSize();
+ int extra = 10;
+ int count = max + 10;
+
+ for (int run = 0; run < 10; run++)
+ {
+ System.out.println("RUN " + (run + 1));
+
+ Task[] tasks = taskManager.createTasks(count);
+ assertEquals(count, taskManager.getCreatedTasks());
+ assertEquals(0, taskManager.getScheduledTasks());
+ assertEquals(0, taskManager.getCurrentlyEnqueuedTasks());
+ assertEquals(0, taskManager.getFinishedTasks());
+
+ taskManager.schedule(tasks, 0, count);
+ assertEquals(count, taskManager.getScheduledTasks());
+ sleep(20);
+ assertEquals(max, taskManager.getStartedTasks());
+ assertEquals(0, taskManager.getFinishedTasks());
+ assertEquals(extra, taskManager.getCurrentlyEnqueuedTasks());
+
+ for (int i = 1; i <= extra; i++)
+ {
+ tasks[i - 1].finish();
+
+ sleep(10);
+ assertEquals(max + i, taskManager.getStartedTasks());
+ assertEquals(i, taskManager.getFinishedTasks());
+ assertEquals(extra - i, taskManager.getCurrentlyEnqueuedTasks());
+ }
+
+ assertEquals(count, taskManager.getStartedTasks());
+ assertEquals(extra, taskManager.getFinishedTasks());
+ assertEquals(0, taskManager.getCurrentlyEnqueuedTasks());
+
+ for (int i = extra; i < count; i++)
+ {
+ tasks[i].finish();
+ }
+
+ sleep(20);
+ assertEquals(count, taskManager.getStartedTasks());
+ assertEquals(count, taskManager.getFinishedTasks());
+ assertEquals(0, taskManager.getCurrentlyEnqueuedTasks());
+
+ taskManager.resetStatistics();
+ }
+ }
+
+ // public static void assertEquals(int expected, int actual)
+ // {
+ // try
+ // {
+ // Assert.assertEquals(expected, actual);
+ // }
+ // catch (RuntimeException ex)
+ // {
+ // ex.printStackTrace();
+ // }
+ // }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class TaskManager extends ThreadPool
+ {
+ private final AtomicInteger createdTasks = new AtomicInteger();
+
+ private final AtomicInteger scheduledTasks = new AtomicInteger();
+
+ private final AtomicInteger startedTasks = new AtomicInteger();
+
+ private final AtomicInteger finishedTasks = new AtomicInteger();
+
+ public TaskManager(int corePoolSize, int maximumPoolSize, long keepAliveTime)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, createThreadFactory());
+ }
+
+ public int getCreatedTasks()
+ {
+ return createdTasks.get();
+ }
+
+ public int getScheduledTasks()
+ {
+ return scheduledTasks.get();
+ }
+
+ public int getStartedTasks()
+ {
+ return startedTasks.get();
+ }
+
+ public int getFinishedTasks()
+ {
+ return finishedTasks.get();
+ }
+
+ public int getCurrentlyEnqueuedTasks()
+ {
+ return getQueue().size();
+ }
+
+ public int getInactiveWorkers()
+ {
+ return getPoolSize() - getActiveCount();
+ }
+
+ public void resetStatistics()
+ {
+ createdTasks.set(0);
+ scheduledTasks.set(0);
+ startedTasks.set(0);
+ finishedTasks.set(0);
+ }
+
+ public Task[] createTasks(int count)
+ {
+ Task[] result = new Task[count];
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = new Task(this, i + 1);
+ }
+
+ return result;
+ }
+
+ public void schedule(Task[] tasks, int start, int end)
+ {
+ for (int i = start; i < end; i++)
+ {
+ Task task = tasks[i];
+ execute(task);
+ }
+ }
+
+ @Override
+ public void execute(Runnable command)
+ {
+ scheduledTasks.incrementAndGet();
+ super.execute(command);
+ }
+
+ private static ThreadFactory createThreadFactory()
+ {
+ final ThreadFactory factory = Executors.defaultThreadFactory();
+
+ return new ThreadFactory()
+ {
+ public Thread newThread(Runnable task)
+ {
+ System.out.println("Creating new worker");
+ return factory.newThread(task);
+ }
+ };
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class Task extends CountDownLatch implements Runnable
+ {
+ private final TaskManager manager;
+
+ private final int id;
+
+ private AtomicBoolean used = new AtomicBoolean();
+
+ public Task(TaskManager manager, int id)
+ {
+ super(1);
+ this.manager = manager;
+ this.id = id;
+
+ manager.createdTasks.incrementAndGet();
+ }
+
+ public final void finish()
+ {
+ countDown();
+ }
+
+ public final void run()
+ {
+ if (!used.compareAndSet(false, true))
+ {
+ throw new IllegalStateException(this + " has already been used");
+ }
+
+ manager.startedTasks.incrementAndGet();
+ System.out.println("Running " + this);
+
+ try
+ {
+ await();
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ System.out.println("Finished " + this);
+ manager.finishedTasks.incrementAndGet();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Task " + id;
+ }
+ }
+ }
}

Back to the top