diff options
4 files changed, 178 insertions, 26 deletions
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java index b08087af75..c7d940379e 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/AsynchronousWorkSerializer.java @@ -31,6 +31,8 @@ public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable private Occupation occupation = new Occupation(); + // private Object newElementLock = new Object(); + public AsynchronousWorkSerializer(ExecutorService executorService, Queue<Runnable> workQueue) { if (executorService == null) @@ -54,22 +56,26 @@ public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable public void addWork(Runnable work) { - workQueue.add(work); - - // isOccupied can (and must) be called unsynchronized here - if (!occupation.isOccupied()) + // Need to be a block of execution. Cannot add when doing last check + // XXX synchronized (newElementLock) { - synchronized (occupation) - { - occupation.setOccupied(true); - } + workQueue.add(work); - if (TRACER.isEnabled()) + // isOccupied can (and must) be called unsynchronized here + if (!occupation.isOccupied()) { - TRACER.trace("Notifying executor service"); //$NON-NLS-1$ - } + synchronized (occupation) + { + occupation.setOccupied(true); + } + + if (TRACER.isEnabled()) + { + TRACER.trace("Notifying executor service"); //$NON-NLS-1$ + } - executorService.execute(this); + executorService.execute(this); + } } } @@ -80,25 +86,47 @@ public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable */ public void run() { - synchronized (occupation) + // XXX synchronized (occupation) { Runnable work; - while (occupation.isOccupied() && (work = workQueue.poll()) != null) + // for (;;) { - try + while (occupation.isOccupied() && (work = workQueue.poll()) != null) { - work.run(); - } - catch (RuntimeException ex) - { - if (TRACER.isEnabled()) + try { - TRACER.trace(ex); + work.run(); + } + catch (RuntimeException ex) + { + if (TRACER.isEnabled()) + { + TRACER.trace(ex); + } } } - } - occupation.setOccupied(false); + // try + // { + // Thread.sleep(500); + // } + // catch (InterruptedException ex) + // { + // throw WrappedException.wrap(ex); + // } + + // Could put the sync in the while loop... but not efficient. + // Doing a last check to make sure that no one added something in the + // queue + // synchronized (newElementLock) + // { + // if (!occupation.isOccupied() || (work = workQueue.peek()) == null) + // { + // occupation.setOccupied(false); + // break; + // } + // } + } } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java new file mode 100644 index 0000000000..4e71f3882e --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/CompletionWorkSerializer.java @@ -0,0 +1,60 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2007 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.internal.util.concurrent; + +import org.eclipse.net4j.util.concurrent.IWorkSerializer; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; + +/** + * @author Eike Stepper + */ +public class CompletionWorkSerializer implements IWorkSerializer +{ + private CompletionService completionService; + + public CompletionWorkSerializer(CompletionService completionService) + { + this.completionService = completionService; + } + + public CompletionWorkSerializer(Executor executor, BlockingQueue completionQueue) + { + this(new ExecutorCompletionService(executor, completionQueue)); + } + + public CompletionWorkSerializer(Executor executor) + { + this(new ExecutorCompletionService(executor)); + } + + public CompletionWorkSerializer() + { + this(new OnePendingExecutor()); + } + + public CompletionService getCompletionService() + { + return completionService; + } + + public void dispose() + { + } + + public void addWork(Runnable work) + { + completionService.submit(work, true); + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java new file mode 100644 index 0000000000..cb0e94ddba --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/concurrent/OnePendingExecutor.java @@ -0,0 +1,64 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2007 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.internal.util.concurrent; + +import java.util.concurrent.Executor; + +public class OnePendingExecutor implements Executor +{ + private Runnable command; + + private Thread thread; + + public OnePendingExecutor() + { + } + + public synchronized void execute(Runnable command) + { + if (this.command != null) + { + throw new IllegalStateException("One command already pending"); + } + + this.command = command; + if (thread == null) + { + thread = new Thread() + { + @Override + public void run() + { + for (;;) + { + Runnable command; + synchronized (OnePendingExecutor.this) + { + if (OnePendingExecutor.this.command == null) + { + thread = null; + return; + } + + command = OnePendingExecutor.this.command; + OnePendingExecutor.this.command = null; + } + + command.run(); + } + } + }; + + thread.setDaemon(true); + thread.start(); + } + } +}
\ No newline at end of file diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java index dbddb4ddf4..9e49cbaf03 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java @@ -17,7 +17,7 @@ import org.eclipse.net4j.IBufferProvider; import org.eclipse.net4j.IChannel; import org.eclipse.net4j.IChannelID; import org.eclipse.net4j.IConnector; -import org.eclipse.net4j.internal.util.concurrent.AsynchronousWorkSerializer; +import org.eclipse.net4j.internal.util.concurrent.CompletionWorkSerializer; import org.eclipse.net4j.internal.util.concurrent.SynchronousWorkSerializer; import org.eclipse.net4j.internal.util.lifecycle.Lifecycle; import org.eclipse.net4j.internal.util.om.trace.ContextTracer; @@ -215,13 +215,13 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider super.doActivate(); sendQueue = new ConcurrentLinkedQueue(); if (receiveExecutor == null) - // XXX if (true) { receiveSerializer = new SynchronousWorkSerializer(); } else { - receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor); + // receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor); + receiveSerializer = new CompletionWorkSerializer(receiveExecutor); } } |