diff options
author | Eike Stepper | 2015-07-22 12:25:46 +0000 |
---|---|---|
committer | Eike Stepper | 2015-07-22 12:25:46 +0000 |
commit | 017c0e91d0dc68b7ead1b4c893f032d3d72e4e31 (patch) | |
tree | 43b63e279685bbafc4e131a243609487e5112c99 /plugins/org.eclipse.net4j.util | |
parent | c8eba7351ffd2d37e17db3ae3c2a7009d366b769 (diff) | |
download | cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.tar.gz cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.tar.xz cdo-017c0e91d0dc68b7ead1b4c893f032d3d72e4e31.zip |
[473277] Enhance ThreadPool and use it as much as possible
https://bugs.eclipse.org/bugs/show_bug.cgi?id=473277
Diffstat (limited to 'plugins/org.eclipse.net4j.util')
18 files changed, 1022 insertions, 119 deletions
diff --git a/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF index e85dcff502..a1071bf9f4 100644 --- a/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j.util/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.net4j.util;singleton:=true -Bundle-Version: 3.5.0.qualifier +Bundle-Version: 3.6.0.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -15,34 +15,34 @@ Import-Package: org.eclipse.osgi.service.debug;version="[1.0.0,2.0.0)";resolutio org.osgi.framework;version="[1.3.0,2.0.0)";resolution:=optional, org.osgi.service.log;version="[1.3.0,2.0.0)";resolution:=optional, org.osgi.util.tracker;version="[1.3.0,2.0.0)";resolution:=optional -Export-Package: org.eclipse.net4j.internal.util.bundle;version="3.5.0";x-friends:="org.eclipse.net4j.util.ui,org.eclipse.net4j.tests", - org.eclipse.net4j.internal.util.container;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.factory;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.om;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.om.pref;version="3.5.0";x-friends:="org.eclipse.net4j.util.defs", - org.eclipse.net4j.internal.util.table;version="3.5.0";x-internal:=true, - org.eclipse.net4j.internal.util.test;version="3.5.0";x-friends:="org.eclipse.net4j.tests", - org.eclipse.net4j.util;version="3.5.0", - org.eclipse.net4j.util.cache;version="3.5.0", - org.eclipse.net4j.util.collection;version="3.5.0", - org.eclipse.net4j.util.concurrent;version="3.5.0", - org.eclipse.net4j.util.confirmation;version="3.5.0", - org.eclipse.net4j.util.container;version="3.5.0", - org.eclipse.net4j.util.container.delegate;version="3.5.0", - org.eclipse.net4j.util.event;version="3.5.0", - org.eclipse.net4j.util.factory;version="3.5.0", - org.eclipse.net4j.util.fsm;version="3.5.0", - org.eclipse.net4j.util.io;version="3.5.0", - org.eclipse.net4j.util.lifecycle;version="3.5.0", - org.eclipse.net4j.util.om;version="3.5.0", - org.eclipse.net4j.util.om.log;version="3.5.0", - org.eclipse.net4j.util.om.monitor;version="3.5.0", - org.eclipse.net4j.util.om.pref;version="3.5.0", - org.eclipse.net4j.util.om.trace;version="3.5.0", - org.eclipse.net4j.util.options;version="3.5.0", - org.eclipse.net4j.util.properties;version="3.5.0", - org.eclipse.net4j.util.ref;version="3.5.0", - org.eclipse.net4j.util.registry;version="3.5.0", - org.eclipse.net4j.util.security;version="3.5.0", - org.eclipse.net4j.util.transaction;version="3.5.0" +Export-Package: org.eclipse.net4j.internal.util.bundle;version="3.6.0";x-friends:="org.eclipse.net4j.util.ui,org.eclipse.net4j.tests", + org.eclipse.net4j.internal.util.container;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.factory;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.om;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.om.pref;version="3.6.0";x-friends:="org.eclipse.net4j.util.defs", + org.eclipse.net4j.internal.util.table;version="3.6.0";x-internal:=true, + org.eclipse.net4j.internal.util.test;version="3.6.0";x-friends:="org.eclipse.net4j.tests", + org.eclipse.net4j.util;version="3.6.0", + org.eclipse.net4j.util.cache;version="3.6.0", + org.eclipse.net4j.util.collection;version="3.6.0", + org.eclipse.net4j.util.concurrent;version="3.6.0", + org.eclipse.net4j.util.confirmation;version="3.6.0", + org.eclipse.net4j.util.container;version="3.6.0", + org.eclipse.net4j.util.container.delegate;version="3.6.0", + org.eclipse.net4j.util.event;version="3.6.0", + org.eclipse.net4j.util.factory;version="3.6.0", + org.eclipse.net4j.util.fsm;version="3.6.0", + org.eclipse.net4j.util.io;version="3.6.0", + org.eclipse.net4j.util.lifecycle;version="3.6.0", + org.eclipse.net4j.util.om;version="3.6.0", + org.eclipse.net4j.util.om.log;version="3.6.0", + org.eclipse.net4j.util.om.monitor;version="3.6.0", + org.eclipse.net4j.util.om.pref;version="3.6.0", + org.eclipse.net4j.util.om.trace;version="3.6.0", + org.eclipse.net4j.util.options;version="3.6.0", + org.eclipse.net4j.util.properties;version="3.6.0", + org.eclipse.net4j.util.ref;version="3.6.0", + org.eclipse.net4j.util.registry;version="3.6.0", + org.eclipse.net4j.util.security;version="3.6.0", + org.eclipse.net4j.util.transaction;version="3.6.0" Eclipse-BuddyPolicy: registered diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java index 887dc5693a..80133bffe3 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/AsynchronousWorkSerializer.java @@ -19,8 +19,10 @@ import java.util.concurrent.ExecutorService; /** * @author Eike Stepper + * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}. */ -public class AsynchronousWorkSerializer implements IWorkSerializer, Runnable +@Deprecated +public class AsynchronousWorkSerializer implements IWorkSerializer, IExecutorServiceProvider, Runnable { private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONCURRENCY, AsynchronousWorkSerializer.class); diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java index 9b4dc04680..69534b6a06 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/CompletionWorkSerializer.java @@ -18,7 +18,9 @@ import java.util.concurrent.Future; /** * @author Eike Stepper + * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}. */ +@Deprecated public class CompletionWorkSerializer implements IWorkSerializer { private CompletionService<Object> completionService; diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java index 303cf02b29..5a9905e821 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ConcurrencyUtil.java @@ -54,4 +54,36 @@ public final class ConcurrencyUtil { return ExecutorServiceFactory.get(container); } + + /** + * @since 3.6 + */ + public static ExecutorService getExecutorService(Object object) + { + if (object instanceof IExecutorServiceProvider) + { + try + { + return ((IExecutorServiceProvider)object).getExecutorService(); + } + catch (Exception ex) + { + //$FALL-THROUGH$ + } + } + + if (object instanceof IManagedContainer) + { + try + { + return getExecutorService((IManagedContainer)object); + } + catch (Exception ex) + { + //$FALL-THROUGH$ + } + } + + return null; + } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java new file mode 100644 index 0000000000..cbd4806086 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/DelegatingExecutorService.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2004-2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public class DelegatingExecutorService implements ExecutorService +{ + private final ExecutorService delegate; + + public DelegatingExecutorService(ExecutorService delegate) + { + this.delegate = delegate; + } + + public void execute(Runnable command) + { + delegate.execute(command); + } + + public void shutdown() + { + // Do nothing. + } + + public List<Runnable> shutdownNow() + { + return Collections.emptyList(); + } + + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.awaitTermination(timeout, unit); + } + + public <T> Future<T> submit(Callable<T> task) + { + return delegate.submit(task); + } + + public <T> Future<T> submit(Runnable task, T result) + { + return delegate.submit(task, result); + } + + public Future<?> submit(Runnable task) + { + return delegate.submit(task); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException + { + return delegate.invokeAll(tasks); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.invokeAll(tasks, timeout, unit); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException + { + return delegate.invokeAny(tasks); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(tasks, timeout, unit); + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java index 965deb6b6b..681d60dc0c 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorServiceFactory.java @@ -20,8 +20,6 @@ import org.eclipse.net4j.util.lifecycle.LifecycleState; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; /** * @author Eike Stepper @@ -40,87 +38,70 @@ public class ExecutorServiceFactory extends Factory super(PRODUCT_GROUP, TYPE); } - public ExecutorService create(String threadGroupName) + public ExecutorService create(String description) { - if (threadGroupName == null) - { - threadGroupName = DEFAULT_THREAD_GROUP_NAME; - } - - final ThreadGroup threadGroup = new ThreadGroup(threadGroupName); - ThreadFactory threadFactory = new ThreadFactory() - { - private int num; - - public Thread newThread(Runnable r) - { - Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-Thread-" + ++num); - thread.setDaemon(true); - return thread; - } - }; - - final ExecutorService executorService = Executors.newCachedThreadPool(threadFactory); + final ExecutorService executorService = ThreadPool.create(description); + return LifecycleUtil.delegateLifecycle(getClass().getClassLoader(), executorService, ExecutorService.class, new ILifecycle() - { - private boolean active; - - public void activate() throws LifecycleException - { - active = true; - } - - public Exception deactivate() - { - try - { - executorService.shutdown(); - active = false; - return null; - } - catch (Exception ex) { - return ex; - } - } - - public LifecycleState getLifecycleState() - { - return active ? LifecycleState.ACTIVE : LifecycleState.INACTIVE; - } - - public boolean isActive() - { - return active; - } - - public void addListener(IListener listener) - { - // Do nothing - } - - public void removeListener(IListener listener) - { - // Do nothing - } - - public IListener[] getListeners() - { - return EventUtil.NO_LISTENERS; - } - - public boolean hasListeners() - { - return false; - } - - @Override - public String toString() - { - return "CachedThreadPool"; - } - }); + private boolean active; + + public void activate() throws LifecycleException + { + active = true; + } + + public Exception deactivate() + { + try + { + executorService.shutdown(); + active = false; + return null; + } + catch (Exception ex) + { + return ex; + } + } + + public LifecycleState getLifecycleState() + { + return active ? LifecycleState.ACTIVE : LifecycleState.INACTIVE; + } + + public boolean isActive() + { + return active; + } + + public void addListener(IListener listener) + { + // Do nothing + } + + public void removeListener(IListener listener) + { + // Do nothing + } + + public IListener[] getListeners() + { + return EventUtil.NO_LISTENERS; + } + + public boolean hasListeners() + { + return false; + } + + @Override + public String toString() + { + return "CachedThreadPool"; + } + }); } public static ExecutorService get(IManagedContainer container) diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java new file mode 100644 index 0000000000..621ec5d51f --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ExecutorWorkSerializer.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import org.eclipse.net4j.util.lifecycle.Lifecycle; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.log.OMLogger; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public class ExecutorWorkSerializer extends Lifecycle implements IWorkSerializer +{ + private final Queue<Runnable> workQueue = new LinkedList<Runnable>(); + + private Executor executor; + + private volatile boolean working; + + private volatile boolean disposed; + + public ExecutorWorkSerializer() + { + } + + public ExecutorWorkSerializer(Executor executor) + { + this.executor = executor; + } + + public Executor getExecutor() + { + return executor; + } + + public void setExecutor(Executor executor) + { + checkInactive(); + this.executor = executor; + } + + public synchronized boolean addWork(Runnable runnable) + { + if (disposed) + { + return false; + } + + if (!working && isActive()) + { + startWork(runnable); + } + else + { + workQueue.add(runnable); + } + + return true; + } + + public synchronized void dispose() + { + LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG); + } + + @Override + public String toString() + { + return ExecutorWorkSerializer.class.getSimpleName(); + } + + protected void handleException(Runnable runnable, Throwable ex) + { + } + + protected void noWork() + { + } + + private void startWork(final Runnable runnable) + { + working = true; + if (!disposed) + { + executor.execute(new RunnableWithName() + { + public String getName() + { + if (runnable instanceof RunnableWithName) + { + return ((RunnableWithName)runnable).getName(); + } + + return null; + } + + public void run() + { + try + { + runnable.run(); + } + catch (Throwable ex) + { + try + { + handleException(runnable, ex); + } + catch (Throwable ignore) + { + //$FALL-THROUGH$ + } + } + + workDone(); + } + }); + } + } + + private synchronized void workDone() + { + Runnable runnable = workQueue.poll(); + if (runnable != null) + { + startWork(runnable); + } + else + { + noWork(); + working = false; + } + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(executor, "executor"); + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + workDone(); + } + + @Override + protected void doDeactivate() throws Exception + { + disposed = true; + working = false; + workQueue.clear(); + + super.doDeactivate(); + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java new file mode 100644 index 0000000000..becd91b6c0 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/IExecutorServiceProvider.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import java.util.concurrent.ExecutorService; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public interface IExecutorServiceProvider +{ + public ExecutorService getExecutorService(); +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java index 2baf087a8b..03163efaf2 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java @@ -15,7 +15,9 @@ import org.eclipse.net4j.util.om.log.OMLogger; /** * @author Eike Stepper + * @deprecated As of 3.6 use {@link ExecutorWorkSerializer}. */ +@Deprecated public class QueueWorkerWorkSerializer extends QueueRunner implements IWorkSerializer { public QueueWorkerWorkSerializer() diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java new file mode 100644 index 0000000000..048de5aa4e --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RunnableWithName.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public interface RunnableWithName extends Runnable +{ + public String getName(); +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java new file mode 100644 index 0000000000..b172dc0598 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/ThreadPool.java @@ -0,0 +1,384 @@ +/* + * Copyright (c) 2004-2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.util.concurrent; + +import org.eclipse.net4j.util.StringUtil; + +import java.lang.reflect.Method; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionHandler +{ + public static final String DEFAULT_THREAD_GROUP_NAME = ExecutorServiceFactory.DEFAULT_THREAD_GROUP_NAME; + + public static final int DEFAULT_CORE_POOL_SIZE = 10; + + public static final int DEFAULT_MAXIMUM_POOL_SIZE = 100; + + public static final long DEFAULT_KEEP_ALIVE_SECONDS = 60; + + private static final Class<?> LINKED_BLOCKING_DEQUE_CLASS; + + private static final Method ADD_FIRST_METHOD; + + private final Executor defaultExecutor = new Executor() + { + public void execute(Runnable runnable) + { + ThreadPool.super.execute(runnable); + } + }; + + private final Executor namingExecutor = new Executor() + { + public void execute(Runnable runnable) + { + if (runnable instanceof RunnableWithName) + { + String name = ((RunnableWithName)runnable).getName(); + if (name != null) + { + Thread thread = new Thread(runnable, name); + thread.setDaemon(true); + thread.start(); + return; + } + } + + ThreadPool.super.execute(runnable); + } + }; + + private volatile Executor executor = defaultExecutor; + + public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, ThreadFactory threadFactory) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, createWorkQueue(), threadFactory); + ((ThreadPool.WorkQueue)getQueue()).setThreadPool(this); + setRejectedExecutionHandler(this); + } + + public final void setNaming(boolean naming) + { + executor = naming ? namingExecutor : defaultExecutor; + } + + @Override + public void execute(final Runnable runnable) + { + executor.execute(runnable); + } + + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) + { + ((ThreadPool.WorkQueue)getQueue()).addFirst(runnable); + } + + public static ThreadPool create() + { + return create(null, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_SECONDS); + } + + public static ThreadPool create(String description) + { + String threadGroupName = null; + int corePoolSize = DEFAULT_CORE_POOL_SIZE; + int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; + long keepAliveSeconds = DEFAULT_KEEP_ALIVE_SECONDS; + + if (!StringUtil.isEmpty(description)) + { + String[] tokens = description.split(":"); + if (tokens.length > 0) + { + threadGroupName = tokens[0]; + if (tokens.length > 1) + { + try + { + corePoolSize = Integer.parseInt(tokens[1]); + } + catch (NumberFormatException ex) + { + //$FALL-THROUGH$ + } + + if (tokens.length > 2) + { + try + { + maximumPoolSize = Integer.parseInt(tokens[2]); + } + catch (NumberFormatException ex) + { + //$FALL-THROUGH$ + } + + if (tokens.length > 3) + { + try + { + keepAliveSeconds = Long.parseLong(tokens[3]); + } + catch (NumberFormatException ex) + { + //$FALL-THROUGH$ + } + } + } + } + } + } + + return create(threadGroupName, corePoolSize, maximumPoolSize, keepAliveSeconds); + } + + public static ThreadPool create(String threadGroupName, int corePoolSize, int maximumPoolSize, long keepAliveSeconds) + { + ThreadFactory threadFactory = createThreadFactory(threadGroupName); + return new ThreadPool(corePoolSize, maximumPoolSize, keepAliveSeconds, threadFactory); + } + + private static ThreadFactory createThreadFactory(String threadGroupName) + { + if (threadGroupName == null) + { + threadGroupName = DEFAULT_THREAD_GROUP_NAME; + } + + final ThreadGroup threadGroup = new ThreadGroup(threadGroupName); + + ThreadFactory threadFactory = new ThreadFactory() + { + private final AtomicInteger num = new AtomicInteger(); + + public Thread newThread(Runnable r) + { + Thread thread = new Thread(threadGroup, r, threadGroup.getName() + "-thread-" + num.incrementAndGet()); + thread.setDaemon(true); + return thread; + } + }; + + return threadFactory; + } + + private static ThreadPool.WorkQueue createWorkQueue() + { + if (LINKED_BLOCKING_DEQUE_CLASS != null) + { + try + { + return new WorkQueueJRE16(); + } + catch (Throwable ex) + { + //$FALL-THROUGH$ + } + } + + return new WorkQueueJRE15(); + } + + static + { + Class<?> c = null; + Method m = null; + + try + { + c = Class.forName("java.util.concurrent.LinkedBlockingDeque"); + m = c.getMethod("addFirst", Object.class); + } + catch (Throwable ex) + { + c = null; + m = null; + } + + LINKED_BLOCKING_DEQUE_CLASS = c; + ADD_FIRST_METHOD = m; + } + + /** + * @author Eike Stepper + */ + private interface WorkQueue extends BlockingQueue<Runnable> + { + public void setThreadPool(ThreadPool threadPool); + + public void addFirst(Runnable runnable); + } + + /** + * @author Eike Stepper + */ + private static final class WorkQueueJRE15 extends LinkedBlockingQueue<Runnable>implements ThreadPool.WorkQueue + { + private static final long serialVersionUID = 1L; + + private ThreadPool threadPool; + + public WorkQueueJRE15() + { + } + + public void setThreadPool(ThreadPool threadPool) + { + this.threadPool = threadPool; + } + + public void addFirst(Runnable runnable) + { + super.offer(runnable); + } + + @Override + public boolean offer(Runnable runnable) + { + if (threadPool.getPoolSize() < threadPool.getMaximumPoolSize()) + { + return false; + } + + return super.offer(runnable); + } + } + + /** + * @author Eike Stepper + */ + private static final class WorkQueueJRE16 extends AbstractQueue<Runnable>implements ThreadPool.WorkQueue + { + private final BlockingQueue<Runnable> delegate = createDelegate(); + + private ThreadPool threadPool; + + public WorkQueueJRE16() + { + } + + public void setThreadPool(ThreadPool threadPool) + { + this.threadPool = threadPool; + } + + public void addFirst(Runnable runnable) + { + try + { + ADD_FIRST_METHOD.invoke(delegate, runnable); + } + catch (Throwable ex) + { + //$FALL-THROUGH$ + } + + delegate.offer(runnable); + } + + public boolean offer(Runnable r) + { + if (threadPool.getPoolSize() < threadPool.getMaximumPoolSize()) + { + return false; + } + + return delegate.offer(r); + } + + @Override + public int size() + { + return delegate.size(); + } + + public Runnable poll() + { + return delegate.poll(); + } + + @Override + public Iterator<Runnable> iterator() + { + return delegate.iterator(); + } + + public Runnable peek() + { + return delegate.peek(); + } + + public void put(Runnable e) throws InterruptedException + { + delegate.put(e); + } + + public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.offer(e, timeout, unit); + } + + public Runnable take() throws InterruptedException + { + return delegate.take(); + } + + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.poll(timeout, unit); + } + + public int remainingCapacity() + { + return delegate.remainingCapacity(); + } + + public int drainTo(Collection<? super Runnable> c) + { + return delegate.drainTo(c); + } + + public int drainTo(Collection<? super Runnable> c, int maxElements) + { + return delegate.drainTo(c, maxElements); + } + + @SuppressWarnings("unchecked") + private static BlockingQueue<Runnable> createDelegate() + { + try + { + return (BlockingQueue<Runnable>)LINKED_BLOCKING_DEQUE_CLASS.newInstance(); + } + catch (Throwable ex) + { + //$FALL-THROUGH$ + } + + return new LinkedBlockingQueue<Runnable>(); + } + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java index 120b3a00d1..d51e68da91 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/event/ExecutorServiceNotifier.java @@ -10,6 +10,8 @@ */ package org.eclipse.net4j.util.event; +import org.eclipse.net4j.util.concurrent.IExecutorServiceProvider; + import java.util.concurrent.ExecutorService; /** @@ -20,7 +22,7 @@ import java.util.concurrent.ExecutorService; * @apiviz.exclude */ @Deprecated -public class ExecutorServiceNotifier extends Notifier +public class ExecutorServiceNotifier extends Notifier implements IExecutorServiceProvider { private ExecutorService notificationExecutorService; @@ -28,6 +30,14 @@ public class ExecutorServiceNotifier extends Notifier { } + /** + * @since 3.6 + */ + public ExecutorService getExecutorService() + { + return notificationExecutorService; + } + @Override public ExecutorService getNotificationService() { diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java index 3a93c37867..20f8d6a459 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataInputExtender.java @@ -12,13 +12,14 @@ package org.eclipse.net4j.util.io; import org.eclipse.net4j.util.io.ExtendedIOUtil.ClassResolver; +import java.io.Closeable; import java.io.DataInput; import java.io.IOException; /** * @author Eike Stepper */ -public class DataInputExtender implements ExtendedDataInput +public class DataInputExtender implements ExtendedDataInput, Closeable { private DataInput input; @@ -142,4 +143,15 @@ public class DataInputExtender implements ExtendedDataInput { return input.skipBytes(n); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (input instanceof Closeable) + { + ((Closeable)input).close(); + } + } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java index c93eab8b8e..d6192e9e72 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/DataOutputExtender.java @@ -10,13 +10,14 @@ */ package org.eclipse.net4j.util.io; +import java.io.Closeable; import java.io.DataOutput; import java.io.IOException; /** * @author Eike Stepper */ -public class DataOutputExtender implements ExtendedDataOutput +public class DataOutputExtender implements ExtendedDataOutput, Closeable { private DataOutput output; @@ -125,4 +126,15 @@ public class DataOutputExtender implements ExtendedDataOutput { ExtendedIOUtil.writeException(output, t); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (output instanceof Closeable) + { + ((Closeable)output).close(); + } + } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java index f52c4e2983..01fbf088b7 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataInput.java @@ -12,6 +12,7 @@ package org.eclipse.net4j.util.io; import org.eclipse.net4j.util.io.ExtendedIOUtil.ClassResolver; +import java.io.Closeable; import java.io.DataInput; import java.io.EOFException; import java.io.IOException; @@ -48,7 +49,7 @@ public interface ExtendedDataInput extends DataInput * @author Eike Stepper * @since 2.0 */ - public static class Delegating implements ExtendedDataInput + public static class Delegating implements ExtendedDataInput, Closeable { private ExtendedDataInput delegate; @@ -177,6 +178,17 @@ public interface ExtendedDataInput extends DataInput { return delegate.skipBytes(n); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + } } /** @@ -209,5 +221,16 @@ public interface ExtendedDataInput extends DataInput return -1; } } + + @Override + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + + super.close(); + } } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java index f3d47ce989..74d86e83e1 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/ExtendedDataOutput.java @@ -10,6 +10,7 @@ */ package org.eclipse.net4j.util.io; +import java.io.Closeable; import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; @@ -41,7 +42,7 @@ public interface ExtendedDataOutput extends DataOutput * @author Eike Stepper * @since 2.0 */ - public static class Delegating implements ExtendedDataOutput + public static class Delegating implements ExtendedDataOutput, Closeable { private ExtendedDataOutput delegate; @@ -155,6 +156,17 @@ public interface ExtendedDataOutput extends DataOutput { delegate.writeUTF(str); } + + /** + * @since 3.6 + */ + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + } } /** @@ -180,5 +192,16 @@ public interface ExtendedDataOutput extends DataOutput { delegate.write(b); } + + @Override + public void close() throws IOException + { + if (delegate instanceof Closeable) + { + ((Closeable)delegate).close(); + } + + super.close(); + } } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java new file mode 100644 index 0000000000..4991d90692 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/CleanableReferenceQueue.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2011-2013, 2015 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + * Simon McDuff - bug 201266 + * Simon McDuff - bug 230832 + */ +package org.eclipse.net4j.util.ref; + +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Eike Stepper + * @since 3.6 + */ +public abstract class CleanableReferenceQueue<T> extends ReferenceQueue<T> +{ + public static final int ALL_WORK_PER_POLL = ReferenceQueueWorker.ALL_WORK_PER_POLL; + + public static final int DEFAULT_MAX_WORK_PER_POLL = ReferenceQueueWorker.DEFAULT_MAX_WORK_PER_POLL; + + public static final int DEFAULT_POLL_MILLIS = ReferenceQueueWorker.DEFAULT_POLL_MILLIS; + + private final AtomicBoolean cleaning = new AtomicBoolean(); + + private int maxWorkPerPoll; + + private long pollMillis; + + private long lastPoll = System.currentTimeMillis(); + + public CleanableReferenceQueue() + { + setPollMillis(DEFAULT_POLL_MILLIS); + setMaxWorkPerPoll(DEFAULT_MAX_WORK_PER_POLL); + } + + public final long getPollMillis() + { + return pollMillis; + } + + public final void setPollMillis(long pollMillis) + { + this.pollMillis = pollMillis; + } + + public final int getMaxWorkPerPoll() + { + return maxWorkPerPoll; + } + + public final void setMaxWorkPerPoll(int maxWorkPerPoll) + { + this.maxWorkPerPoll = maxWorkPerPoll; + } + + public final void register(T object) + { + clean(); + createReference(object); + } + + public final void clean() + { + if (cleaning.compareAndSet(false, true)) + { + long now = System.currentTimeMillis(); + if (lastPoll + pollMillis > now) + { + int count = maxWorkPerPoll; + if (count == ALL_WORK_PER_POLL) + { + count = Integer.MAX_VALUE; + } + + for (int i = 0; i < count; i++) + { + Reference<? extends T> reference = poll(); + if (reference == null) + { + break; + } + + cleanReference(reference); + } + + lastPoll = now; + } + + cleaning.set(false); + } + } + + protected abstract void cleanReference(Reference<? extends T> reference); + + protected abstract Reference<T> createReference(T object); +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java index 0fc0d05f0f..2598c12aec 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java @@ -25,17 +25,17 @@ public abstract class ReferenceQueueWorker<T> extends Worker /** * @since 3.0 */ - public static final int DEFAULT_POLL_MILLIS = 1000 * 60; // One minute + public static final int ALL_WORK_PER_POLL = -1; /** * @since 3.0 */ - public static final int ALL_WORK_PER_POLL = -1; + public static final int DEFAULT_MAX_WORK_PER_POLL = ALL_WORK_PER_POLL; /** * @since 3.0 */ - public static final int DEFAULT_MAX_WORK_PER_POLL = ALL_WORK_PER_POLL; + public static final int DEFAULT_POLL_MILLIS = 1000 * 60; // One minute private ReferenceQueue<T> queue = new ReferenceQueue<T>(); |