diff options
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java')
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java new file mode 100644 index 000000000..09c0c038f --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java @@ -0,0 +1,474 @@ +/* + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +import static org.osgi.util.promise.PromiseImpl.uncaughtException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.osgi.annotation.versioning.ConsumerType; +import org.osgi.util.promise.PromiseImpl.Result; + +/** + * Promise factory to create Deferred and Promise objects. + * <p> + * Instances of this class can be used to create Deferred and Promise objects + * which use the executors used to construct this object for any callback or + * scheduled operation execution. + * + * @Immutable + * @author $Id$ + * @since 1.1 + */ +@ConsumerType +public class PromiseFactory { + /** + * The default factory which uses the default callback executor and default + * scheduled executor. + */ + final static PromiseFactory defaultFactory = new PromiseFactory( + null, null); + + /** + * The executor to use for callbacks. If {@code null}, the default + * callback executor is used. + */ + private final Executor callbackExecutor; + /** + * The executor to use for scheduled operations. If {@code null}, the + * default scheduled executor is used. + */ + private final ScheduledExecutorService scheduledExecutor; + + + /** + * Create a new PromiseFactory with the specified callback executor. + * <p> + * The default scheduled executor will be used. + * + * @param callbackExecutor The executor to use for callbacks. {@code null} + * can be specified for the default callback executor. + */ + public PromiseFactory(Executor callbackExecutor) { + this(callbackExecutor, null); + } + + /** + * Create a new PromiseFactory with the specified callback executor and + * specified scheduled executor. + * + * @param callbackExecutor The executor to use for callbacks. {@code null} + * can be specified for the default callback executor. + * @param scheduledExecutor The scheduled executor for use for scheduled + * operations. {@code null} can be specified for the default + * scheduled executor. + */ + public PromiseFactory(Executor callbackExecutor, + ScheduledExecutorService scheduledExecutor) { + this.callbackExecutor = callbackExecutor; + this.scheduledExecutor = scheduledExecutor; + } + + /** + * Returns the executor to use for callbacks. + * + * @return The executor to use for callbacks. This will be the default + * callback executor if {@code null} was specified for the callback + * executor when this PromiseFactory was created. + */ + public Executor executor() { + if (callbackExecutor == null) { + return DefaultExecutors.callbackExecutor(); + } + return callbackExecutor; + } + + /** + * Returns the scheduled executor to use for scheduled operations. + * + * @return The scheduled executor to use for scheduled operations. This will + * be the default scheduled executor if {@code null} was specified + * for the scheduled executor when this PromiseFactory was created. + */ + public ScheduledExecutorService scheduledExecutor() { + if (scheduledExecutor == null) { + return DefaultExecutors.scheduledExecutor(); + } + return scheduledExecutor; + } + + /** + * Create a new Deferred with the callback executor and scheduled executor + * of this PromiseFactory object. + * <p> + * Use this method instead of {@link Deferred#Deferred()} to create a new + * {@link Deferred} whose associated Promise uses executors other than the + * default executors. + * + * @param <T> The value type associated with the returned Deferred. + * @return A new {@link Deferred} with the callback and scheduled executors + * of this PromiseFactory object + */ + public <T> Deferred<T> deferred() { + return new Deferred<>(this); + } + + /** + * Returns a new Promise that has been resolved with the specified value. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseFactory object + * <p> + * Use this method instead of {@link Promises#resolved(Object)} to create a + * Promise which uses executors other than the default executors. + * + * @param <T> The value type associated with the returned Promise. + * @param value The value of the resolved Promise. + * @return A new Promise that has been resolved with the specified value. + */ + public <T> Promise<T> resolved(T value) { + return new ResolvedPromiseImpl<>(value, this); + } + + /** + * Returns a new Promise that has been resolved with the specified failure. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseFactory object + * <p> + * Use this method instead of {@link Promises#failed(Throwable)} to create a + * Promise which uses executors other than the default executors. + * + * @param <T> The value type associated with the returned Promise. + * @param failure The failure of the resolved Promise. Must not be + * {@code null}. + * @return A new Promise that has been resolved with the specified failure. + */ + public <T> Promise<T> failed(Throwable failure) { + return new FailedPromiseImpl<>(failure, this); + } + + /** + * Returns a new Promise that will hold the result of the specified task. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseFactory object + * <p> + * The specified task will be executed on the {@link #executor() callback + * executor}. + * + * @param <T> The value type associated with the returned Promise. + * @param task The task whose result will be available from the returned + * Promise. + * @return A new Promise that will hold the result of the specified task. + */ + public <T> Promise<T> submit(Callable< ? extends T> task) { + DeferredPromiseImpl<T> promise = new DeferredPromiseImpl<>(this); + Runnable submit = promise.new Submit(task); + try { + executor().execute(submit); + } catch (Exception t) { + promise.tryResolve(null, t); + } + return promise; + } + + /** + * Returns a new Promise that is a latch on the resolution of the specified + * Promises. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseFactory object + * <p> + * The returned Promise acts as a gate and must be resolved after all of the + * specified Promises are resolved. + * + * @param <T> The value type of the List value associated with the returned + * Promise. + * @param <S> A subtype of the value type of the List value associated with + * the returned Promise. + * @param promises The Promises which must be resolved before the returned + * Promise must be resolved. Must not be {@code null} and all of + * the elements in the collection must not be {@code null}. + * @return A Promise that must be successfully resolved with a List of the + * values in the order of the specified Promises if all the + * specified Promises are successfully resolved. The List in the + * returned Promise is the property of the caller and is modifiable. + * The returned Promise must be resolved with a failure of + * {@link FailedPromisesException} if any of the specified Promises + * are resolved with a failure. The failure + * {@link FailedPromisesException} must contain all of the specified + * Promises which resolved with a failure. + */ + public <T, S extends T> Promise<List<T>> all( + Collection<Promise<S>> promises) { + if (promises.isEmpty()) { + List<T> result = new ArrayList<>(); + return resolved(result); + } + + DeferredPromiseImpl<List<T>> promise = new DeferredPromiseImpl<>(this); + /* make a copy and capture the ordering */ + List<Promise<S>> list = new ArrayList<>(promises); + All<T,S> all = new All<>(promise, list); + for (Promise<S> p : list) { + p.onResolve(all); + } + return promise; + } + + /** + * A callback used to resolve the specified Promise when the specified list + * of Promises are resolved for the {@link PromiseFactory#all(Collection)} + * method. + * + * @ThreadSafe + */ + private static final class All<T, S extends T> implements Runnable { + private final DeferredPromiseImpl<List<T>> promise; + private final List<Promise<S>> promises; + private final AtomicInteger promiseCount; + + All(DeferredPromiseImpl<List<T>> promise, + List<Promise<S>> promises) { + this.promise = promise; + this.promises = promises; + this.promiseCount = new AtomicInteger(promises.size()); + } + + @Override + public void run() { + if (promiseCount.decrementAndGet() != 0) { + return; + } + List<T> value = new ArrayList<>(promises.size()); + List<Promise< ? >> failed = new ArrayList<>(promises.size()); + Throwable cause = null; + for (Promise<S> p : promises) { + Result<S> result = PromiseImpl.collect(p); + if (result.fail != null) { + failed.add(p); + if (cause == null) { + cause = result.fail; + } + } else { + value.add(result.value); + } + } + if (failed.isEmpty()) { + promise.tryResolve(value, null); + } else { + promise.tryResolve(null, + new FailedPromisesException(failed, cause)); + } + } + } + + /** + * Returns an Executor implementation that executes tasks immediately on the + * thread calling the {@code Executor.execute} method. + * + * @return An Executor implementation that executes tasks immediately on the + * thread calling the {@code Executor.execute} method. + */ + public static Executor inlineExecutor() { + return new InlineExecutor(); + } + + /** + * An Executor implementation which executes the task immediately on the + * thread calling the {@code Executor.execute} method. + * + * @Immutable + */ + private static final class InlineExecutor implements Executor { + InlineExecutor() {} + + @Override + public void execute(Runnable callback) { + callback.run(); + } + } + + /** + * Default executors for Promises. + * + * @Immutable + */ + private static final class DefaultExecutors + implements ThreadFactory, RejectedExecutionHandler, Runnable { + private static final DefaultExecutors callbacks; + private static final ScheduledExecutor scheduledExecutor; + private static final ThreadPoolExecutor callbackExecutor; + static { + callbacks = new DefaultExecutors(); + scheduledExecutor = new ScheduledExecutor(2, callbacks); + callbackExecutor = new ThreadPoolExecutor(0, 64, 60L, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + callbacks, callbacks); + } + + static Executor callbackExecutor() { + return callbackExecutor; + } + + static ScheduledExecutorService scheduledExecutor() { + return scheduledExecutor; + } + + private final AtomicBoolean shutdownHookInstalled; + private final ThreadFactory delegateThreadFactory; + + private DefaultExecutors() { + shutdownHookInstalled = new AtomicBoolean(); + delegateThreadFactory = Executors.defaultThreadFactory(); + } + + /** + * Executor threads should not prevent VM from exiting + */ + @Override + public Thread newThread(Runnable r) { + if (shutdownHookInstalled.compareAndSet(false, true)) { + Thread shutdownThread = delegateThreadFactory.newThread(this); + shutdownThread.setName( + "ExecutorShutdownHook," + shutdownThread.getName()); + try { + Runtime.getRuntime().addShutdownHook(shutdownThread); + } catch (IllegalStateException e) { + // VM is already shutting down... + callbackExecutor.shutdown(); + scheduledExecutor.shutdown(); + } + } + Thread t = delegateThreadFactory.newThread(r); + t.setName("PromiseFactory," + t.getName()); + t.setDaemon(true); + return t; + } + + /** + * Call the callback using the caller's thread because the thread pool + * rejected the execution. + */ + @Override + public void rejectedExecution(Runnable callback, + ThreadPoolExecutor executor) { + try { + callback.run(); + } catch (Throwable t) { + uncaughtException(t); + } + } + + /** + * Shutdown hook + */ + @Override + public void run() { + // limit new thread creation + callbackExecutor.setMaximumPoolSize( + Math.max(1, callbackExecutor.getPoolSize())); + // Run all delayed callbacks now + scheduledExecutor.shutdown(); + BlockingQueue<Runnable> queue = scheduledExecutor.getQueue(); + if (!queue.isEmpty()) { + for (Object r : queue.toArray()) { + if (r instanceof RunnableScheduledFuture< ? >) { + RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r; + if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L) + && queue.remove(future)) { + future.run(); + scheduledExecutor.afterExecute(future, null); + } + } + } + scheduledExecutor.shutdown(); + } + try { + scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Shutdown callback executor + callbackExecutor.shutdown(); + try { + callbackExecutor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * ScheduledThreadPoolExecutor for scheduled execution. + * + * @ThreadSafe + */ + private static final class ScheduledExecutor + extends ScheduledThreadPoolExecutor { + ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + /** + * Handle uncaught exceptions + */ + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if ((t == null) && (r instanceof Future< ? >)) { + boolean interrupted = Thread.interrupted(); + try { + ((Future< ? >) r).get(); + } catch (CancellationException e) { + // ignore + } catch (InterruptedException e) { + interrupted = true; + } catch (ExecutionException e) { + t = e.getCause(); + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + } + if (t != null) { + uncaughtException(t); + } + } + } + } +} |