Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java74
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java60
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java64
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java6
4 files changed, 178 insertions, 26 deletions
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java
index b08087af75..c7d940379e 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java
@@ -31,6 +31,8 @@ public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable
private Occupation occupation = new Occupation();
+ // private Object newElementLock = new Object();
+
public AsynchronousWorkSerializer(ExecutorService executorService, Queue<Runnable> workQueue)
{
if (executorService == null)
@@ -54,22 +56,26 @@ public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable
public void addWork(Runnable work)
{
- workQueue.add(work);
-
- // isOccupied can (and must) be called unsynchronized here
- if (!occupation.isOccupied())
+ // Need to be a block of execution. Cannot add when doing last check
+ // XXX synchronized (newElementLock)
{
- synchronized (occupation)
- {
- occupation.setOccupied(true);
- }
+ workQueue.add(work);
- if (TRACER.isEnabled())
+ // isOccupied can (and must) be called unsynchronized here
+ if (!occupation.isOccupied())
{
- TRACER.trace("Notifying executor service"); //$NON-NLS-1$
- }
+ synchronized (occupation)
+ {
+ occupation.setOccupied(true);
+ }
+
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Notifying executor service"); //$NON-NLS-1$
+ }
- executorService.execute(this);
+ executorService.execute(this);
+ }
}
}
@@ -80,25 +86,47 @@ public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable
*/
public void run()
{
- synchronized (occupation)
+ // XXX synchronized (occupation)
{
Runnable work;
- while (occupation.isOccupied() && (work = workQueue.poll()) != null)
+ // for (;;)
{
- try
+ while (occupation.isOccupied() && (work = workQueue.poll()) != null)
{
- work.run();
- }
- catch (RuntimeException ex)
- {
- if (TRACER.isEnabled())
+ try
{
- TRACER.trace(ex);
+ work.run();
+ }
+ catch (RuntimeException ex)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace(ex);
+ }
}
}
- }
- occupation.setOccupied(false);
+ // try
+ // {
+ // Thread.sleep(500);
+ // }
+ // catch (InterruptedException ex)
+ // {
+ // throw WrappedException.wrap(ex);
+ // }
+
+ // Could put the sync in the while loop... but not efficient.
+ // Doing a last check to make sure that no one added something in the
+ // queue
+ // synchronized (newElementLock)
+ // {
+ // if (!occupation.isOccupied() || (work = workQueue.peek()) == null)
+ // {
+ // occupation.setOccupied(false);
+ // break;
+ // }
+ // }
+ }
}
}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java
new file mode 100644
index 0000000000..4e71f3882e
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java
@@ -0,0 +1,60 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2007 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.internal.util.concurrent;
+
+import org.eclipse.net4j.util.concurrent.IWorkSerializer;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+
+/**
+ * @author Eike Stepper
+ */
+public class CompletionWorkSerializer implements IWorkSerializer
+{
+ private CompletionService completionService;
+
+ public CompletionWorkSerializer(CompletionService completionService)
+ {
+ this.completionService = completionService;
+ }
+
+ public CompletionWorkSerializer(Executor executor, BlockingQueue completionQueue)
+ {
+ this(new ExecutorCompletionService(executor, completionQueue));
+ }
+
+ public CompletionWorkSerializer(Executor executor)
+ {
+ this(new ExecutorCompletionService(executor));
+ }
+
+ public CompletionWorkSerializer()
+ {
+ this(new OnePendingExecutor());
+ }
+
+ public CompletionService getCompletionService()
+ {
+ return completionService;
+ }
+
+ public void dispose()
+ {
+ }
+
+ public void addWork(Runnable work)
+ {
+ completionService.submit(work, true);
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java
new file mode 100644
index 0000000000..cb0e94ddba
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java
@@ -0,0 +1,64 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2007 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.internal.util.concurrent;
+
+import java.util.concurrent.Executor;
+
+public class OnePendingExecutor implements Executor
+{
+ private Runnable command;
+
+ private Thread thread;
+
+ public OnePendingExecutor()
+ {
+ }
+
+ public synchronized void execute(Runnable command)
+ {
+ if (this.command != null)
+ {
+ throw new IllegalStateException("One command already pending");
+ }
+
+ this.command = command;
+ if (thread == null)
+ {
+ thread = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ for (;;)
+ {
+ Runnable command;
+ synchronized (OnePendingExecutor.this)
+ {
+ if (OnePendingExecutor.this.command == null)
+ {
+ thread = null;
+ return;
+ }
+
+ command = OnePendingExecutor.this.command;
+ OnePendingExecutor.this.command = null;
+ }
+
+ command.run();
+ }
+ }
+ };
+
+ thread.setDaemon(true);
+ thread.start();
+ }
+ }
+} \ No newline at end of file
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java
index dbddb4ddf4..9e49cbaf03 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java
@@ -17,7 +17,7 @@ import org.eclipse.net4j.IBufferProvider;
import org.eclipse.net4j.IChannel;
import org.eclipse.net4j.IChannelID;
import org.eclipse.net4j.IConnector;
-import org.eclipse.net4j.internal.util.concurrent.AsynchronousWorkSerializer;
+import org.eclipse.net4j.internal.util.concurrent.CompletionWorkSerializer;
import org.eclipse.net4j.internal.util.concurrent.SynchronousWorkSerializer;
import org.eclipse.net4j.internal.util.lifecycle.Lifecycle;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
@@ -215,13 +215,13 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider
super.doActivate();
sendQueue = new ConcurrentLinkedQueue();
if (receiveExecutor == null)
- // XXX if (true)
{
receiveSerializer = new SynchronousWorkSerializer();
}
else
{
- receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor);
+ // receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor);
+ receiveSerializer = new CompletionWorkSerializer(receiveExecutor);
}
}

Back to the top