diff options
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java | 247 |
1 files changed, 156 insertions, 91 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java index 2afb3771f..1a65fc8ef 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.osgi.util.promise; +import static java.util.Objects.requireNonNull; + import java.lang.reflect.InvocationTargetException; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; @@ -23,11 +25,13 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; @@ -42,18 +46,26 @@ import org.osgi.util.function.Predicate; /** * Promise implementation. - * * <p> * This class is not used directly by clients. Clients should use * {@link Deferred} to create a resolvable {@link Promise}. * * @param <T> The result type associated with the Promise. - * * @ThreadSafe * @author $Id$ */ final class PromiseImpl<T> implements Promise<T> { /** + * The executor to use for callbacks. If {@code null}, the default callback + * executor is used. + */ + private final Executor callbackExecutor; + /** + * The executor to use for scheduled operations. If {@code null}, the + * default scheduled executor is used. + */ + private final ScheduledExecutorService scheduledExecutor; + /** * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no * additional synchronization is required to write to or read from the * queue. @@ -91,8 +103,17 @@ final class PromiseImpl<T> implements Promise<T> { /** * Initialize this Promise. + * + * @param callbackExecutor The executor to use for callbacks. {@code null} + * can be specified for the default callback executor. + * @param scheduledExecutor The scheduled executor for use for scheduled + * operations. {@code null} can be specified for the default + * scheduled executor. */ - PromiseImpl() { + PromiseImpl(Executor callbackExecutor, + ScheduledExecutorService scheduledExecutor) { + this.callbackExecutor = callbackExecutor; + this.scheduledExecutor = scheduledExecutor; callbacks = new ConcurrentLinkedQueue<>(); resolved = new CountDownLatch(1); } @@ -102,13 +123,21 @@ final class PromiseImpl<T> implements Promise<T> { * * @param v The value of this resolved Promise. * @param f The failure of this resolved Promise. + * @param callbackExecutor The executor to use for callbacks. {@code null} + * can be specified for the default callback executor. + * @param scheduledExecutor The scheduled executor for use for scheduled + * operations. {@code null} can be specified for the default + * scheduled executor. */ - PromiseImpl(T v, Throwable f) { + PromiseImpl(T v, Throwable f, Executor callbackExecutor, + ScheduledExecutorService scheduledExecutor) { if (f == null) { value = v; } else { fail = f; } + this.callbackExecutor = callbackExecutor; + this.scheduledExecutor = scheduledExecutor; callbacks = new ConcurrentLinkedQueue<>(); resolved = new CountDownLatch(0); } @@ -164,24 +193,6 @@ final class PromiseImpl<T> implements Promise<T> { } /** - * Call any registered callbacks if this Promise is resolved. - */ - private void notifyCallbacks() { - if (resolved.getCount() != 0) { - return; // return if not resolved - } - - /* - * Note: multiple threads can be in this method removing callbacks from - * the queue and executing them, so the order in which callbacks are - * executed cannot be specified. - */ - for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { - Callbacks.execute(callback); - } - } - - /** * {@inheritDoc} */ @Override @@ -245,17 +256,65 @@ final class PromiseImpl<T> implements Promise<T> { } /** + * Call any registered callbacks if this Promise is resolved. + */ + private void notifyCallbacks() { + if (resolved.getCount() != 0) { + return; // return if not resolved + } + + /* + * Note: multiple threads can be in this method removing callbacks from + * the queue and executing them, so the order in which callbacks are + * executed cannot be specified. + */ + Executor executor = callbackExecutor; + for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { + if (executor == null) { + executor = DefaultExecutors.callbackExecutor(); + } + try { + try { + executor.execute(callback); + } catch (RejectedExecutionException e) { + callback.run(); + } + } catch (Throwable t) { + uncaughtException(t); + } + } + } + + /** + * Handle an uncaught exception from a Runnable. + * + * @param t The uncaught exception. + * @since 1.1 + */ + static void uncaughtException(Throwable t) { + try { + Thread thread = Thread.currentThread(); + thread.getUncaughtExceptionHandler().uncaughtException(thread, t); + } catch (Throwable ignored) { + // we ignore this + } + } + + /** * {@inheritDoc} */ @Override public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { - PromiseImpl<R> chained = new PromiseImpl<>(); + PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new Then<>(this, success, failure)); return chained; } /** * {@inheritDoc} + * + * @since 1.1 */ @Override public <R> Promise<R> then(Success<? super T, ? extends R> success) { @@ -263,6 +322,17 @@ final class PromiseImpl<T> implements Promise<T> { } /** + * {@inheritDoc} + */ + @Override + public Promise<T> then(Callback callback) { + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); + onResolve(chained.new Chain(this, callback)); + return chained; + } + + /** * A callback used to chain promises for the {@link #then(Success, Failure)} * method. * @@ -357,16 +427,6 @@ final class PromiseImpl<T> implements Promise<T> { } /** - * {@inheritDoc} - */ - @Override - public Promise<T> then(Callback callback) { - PromiseImpl<T> chained = new PromiseImpl<>(); - onResolve(chained.new Chain(this, callback)); - return chained; - } - - /** * Resolve this Promise with the specified Promise. * * <p> @@ -386,7 +446,8 @@ final class PromiseImpl<T> implements Promise<T> { * resolved. */ Promise<Void> resolveWith(Promise<? extends T> with) { - PromiseImpl<Void> chained = new PromiseImpl<>(); + PromiseImpl<Void> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); with.onResolve(new ResolveWith(with, chained)); return chained; } @@ -424,7 +485,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> filter(Predicate<? super T> predicate) { - PromiseImpl<T> chained = new PromiseImpl<>(); + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new Filter(this, predicate)); return chained; } @@ -464,7 +526,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { - PromiseImpl<R> chained = new PromiseImpl<>(); + PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new Map<>(this, mapper)); return chained; } @@ -504,7 +567,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { - PromiseImpl<R> chained = new PromiseImpl<>(); + PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new FlatMap<>(this, mapper)); return chained; } @@ -548,7 +612,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { - PromiseImpl<T> chained = new PromiseImpl<>(); + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new Recover(this, recovery)); return chained; } @@ -591,7 +656,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { - PromiseImpl<T> chained = new PromiseImpl<>(); + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new RecoverWith(this, recovery)); return chained; } @@ -635,7 +701,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> fallbackTo(Promise<? extends T> fallback) { - PromiseImpl<T> chained = new PromiseImpl<>(); + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(chained.new FallbackTo(this, fallback)); return chained; } @@ -666,13 +733,37 @@ final class PromiseImpl<T> implements Promise<T> { } /** + * Schedule a operation on the scheduled executor. + * + * @since 1.1 + */ + ScheduledFuture< ? > schedule(Runnable operation, long delay, + TimeUnit unit) { + ScheduledExecutorService executor = scheduledExecutor; + if (executor == null) { + executor = DefaultExecutors.scheduledExecutor(); + } + try { + try { + return executor.schedule(operation, delay, unit); + } catch (RejectedExecutionException e) { + operation.run(); + } + } catch (Throwable t) { + uncaughtException(t); + } + return null; + } + + /** * {@inheritDoc} * * @since 1.1 */ @Override public Promise<T> timeout(long millis) { - PromiseImpl<T> chained = new PromiseImpl<>(); + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); if (!isDone()) { onResolve(chained.new Timeout(millis, TimeUnit.MILLISECONDS)); } @@ -691,7 +782,7 @@ final class PromiseImpl<T> implements Promise<T> { private final ScheduledFuture< ? > future; Timeout(long timeout, TimeUnit unit) { - future = Callbacks.schedule(new TimeoutAction(), timeout, unit); + future = schedule(new TimeoutAction(), timeout, unit); } @Override @@ -701,15 +792,16 @@ final class PromiseImpl<T> implements Promise<T> { } } } - + /** * Callback used to fail the Promise if the timeout expires. * * @Immutable + * @since 1.1 */ private final class TimeoutAction implements Runnable { TimeoutAction() {} - + @Override public void run() { tryResolve(null, new TimeoutException()); @@ -723,7 +815,8 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> delay(long millis) { - PromiseImpl<T> chained = new PromiseImpl<>(); + PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, + scheduledExecutor); onResolve(new Delay(chained.new Chain(this), millis, TimeUnit.MILLISECONDS)); return chained; @@ -736,76 +829,54 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable * @since 1.1 */ - private static final class Delay implements Runnable { - private final Runnable callback; + private final class Delay implements Runnable { + private final Runnable operation; private final long delay; private final TimeUnit unit; - Delay(Runnable callback, long delay, TimeUnit unit) { - this.callback = callback; + Delay(Runnable operation, long delay, TimeUnit unit) { + this.operation = operation; this.delay = delay; this.unit = unit; } @Override public void run() { - Callbacks.schedule(callback, delay, unit); + schedule(operation, delay, unit); } } /** - * Callback handler used to asynchronously execute callbacks. + * Default executors for callbacks. * * @Immutable * @since 1.1 */ - private static final class Callbacks + private static final class DefaultExecutors implements ThreadFactory, RejectedExecutionHandler, Runnable { - private static final Callbacks callbacks; + private static final DefaultExecutors callbacks; private static final ScheduledExecutor scheduledExecutor; private static final ThreadPoolExecutor callbackExecutor; static { - callbacks = new Callbacks(); + callbacks = new DefaultExecutors(); scheduledExecutor = new ScheduledExecutor(2, callbacks); callbackExecutor = new ThreadPoolExecutor(0, 64, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callbacks, callbacks); } - /** - * Schedule a callback on the scheduled executor - */ - static ScheduledFuture< ? > schedule(Runnable callback, long delay, - TimeUnit unit) { - try { - return scheduledExecutor.schedule(callback, delay, unit); - } catch (RejectedExecutionException e) { - callbacks.rejectedExecution(callback, scheduledExecutor); - return null; - } + static Executor callbackExecutor() { + return callbackExecutor; } - /** - * Execute a callback on the callback executor - */ - static void execute(Runnable callback) { - callbackExecutor.execute(callback); - } - - static void uncaughtException(Throwable t) { - try { - Thread thread = Thread.currentThread(); - thread.getUncaughtExceptionHandler().uncaughtException(thread, - t); - } catch (Throwable ignored) { - // we ignore this - } + static ScheduledExecutorService scheduledExecutor() { + return scheduledExecutor; } private final AtomicBoolean shutdownHookInstalled; private final ThreadFactory delegateThreadFactory; - private Callbacks() { + private DefaultExecutors() { shutdownHookInstalled = new AtomicBoolean(); delegateThreadFactory = Executors.defaultThreadFactory(); } @@ -929,13 +1000,14 @@ final class PromiseImpl<T> implements Promise<T> { * A holder of the result of a promise. * * @NotThreadSafe + * @since 1.1 */ static final class Result<P> { Throwable fail; P value; - + Result() {} - + static <R> Result<R> collect(Promise< ? extends R> promise) { Result<R> result = new Result<>(); final boolean interrupted = Thread.interrupted(); @@ -954,11 +1026,4 @@ final class PromiseImpl<T> implements Promise<T> { return result; } } - - static <V> V requireNonNull(V value) { - if (value != null) { - return value; - } - throw new NullPointerException(); - } } |