diff options
author | Thomas Watson | 2017-11-13 18:59:51 +0000 |
---|---|---|
committer | Thomas Watson | 2017-11-13 19:04:00 +0000 |
commit | 7ac54c883ba1aa4dc3e0bdfa981f090fbc3bcb1c (patch) | |
tree | 43440d7c400a4f0331cb74c47803e2893d7fe270 | |
parent | af9796c48cdef3e4f83fbd382fbb69b4441267c4 (diff) | |
download | rt.equinox.framework-7ac54c883ba1aa4dc3e0bdfa981f090fbc3bcb1c.tar.gz rt.equinox.framework-7ac54c883ba1aa4dc3e0bdfa981f090fbc3bcb1c.tar.xz rt.equinox.framework-7ac54c883ba1aa4dc3e0bdfa981f090fbc3bcb1c.zip |
Bug 527221 - [osgi R7] update log stream impl and APII20171115-0115I20171115-0025I20171114-2000I20171113-2000
Change-Id: Id5a0c165285c6b0bf496c948f26616161f5df4ab
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Consumer.java (renamed from bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java) | 12 | ||||
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java | 40 | ||||
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java | 113 | ||||
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java | 383 | ||||
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java | 611 | ||||
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java | 44 |
6 files changed, 780 insertions, 423 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Consumer.java index 17ff376bc..3636c6043 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Consumer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2016). All Rights Reserved. + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,22 +19,24 @@ package org.osgi.util.function; import org.osgi.annotation.versioning.ConsumerType; /** - * A callback that performs an operation and may throw an exception. + * A function that accepts a single argument and produces no result. * <p> * This is a functional interface and can be used as the assignment target for a * lambda expression or method reference. * + * @param <T> The type of the function input. * @ThreadSafe * @since 1.1 * @author $Id$ */ @ConsumerType @FunctionalInterface -public interface Callback { +public interface Consumer<T> { /** - * Execute the callback. + * Applies this function to the specified argument. * + * @param t The input to this function. * @throws Exception An exception thrown by the method. */ - void run() throws Exception; + void accept(T t) throws Exception; } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java index 25c7442d3..e2c26cb80 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java @@ -18,8 +18,7 @@ package org.osgi.util.promise; import static java.util.Objects.requireNonNull; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; +import org.osgi.annotation.versioning.ProviderType; /** * A Deferred Promise resolution. @@ -41,46 +40,35 @@ import java.util.concurrent.ScheduledExecutorService; * @Immutable * @author $Id$ */ +@ProviderType public class Deferred<T> { - private final PromiseImpl<T> promise; - /** - * Create a new Deferred. - * <p> - * The default callback executor and default scheduled executor will be - * used. + * The Promise associated with this Deferred. */ - public Deferred() { - this(null, null); - } + private final PromiseImpl<T> promise; /** - * Create a new Deferred with the specified callback executor. + * Create a new Deferred. * <p> - * The default scheduled executor will be used. + * The {@link #getPromise() associated promise} will use the default + * callback executor and default scheduled executor. * - * @param callbackExecutor The executor to use for callbacks. {@code null} - * can be specified for the default callback executor. - * @since 1.1 + * @see PromiseExecutors#deferred() */ - public Deferred(Executor callbackExecutor) { - this(callbackExecutor, null); + public Deferred() { + this(PromiseExecutors.defaultExecutors); } /** * Create a new Deferred with the specified callback and scheduled * executors. * - * @param callbackExecutor The executor to use for callbacks. {@code null} - * can be specified for the default callback executor. - * @param scheduledExecutor The scheduled executor for use for scheduled - * operations. {@code null} can be specified for the default - * scheduled executor. + * @param executors The executors to use for callbacks and scheduled + * operations. * @since 1.1 */ - public Deferred(Executor callbackExecutor, - ScheduledExecutorService scheduledExecutor) { - promise = new PromiseImpl<>(callbackExecutor, scheduledExecutor); + Deferred(PromiseExecutors executors) { + promise = new PromiseImpl<>(executors); } /** diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java index f9e4ef5d2..a2082aef3 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java @@ -19,7 +19,7 @@ package org.osgi.util.promise; import java.lang.reflect.InvocationTargetException; import org.osgi.annotation.versioning.ProviderType; -import org.osgi.util.function.Callback; +import org.osgi.util.function.Consumer; import org.osgi.util.function.Function; import org.osgi.util.function.Predicate; @@ -115,41 +115,85 @@ public interface Promise<T> { /** * Register a callback to be called when this Promise is resolved. - * * <p> * The specified callback is called when this Promise is resolved either * successfully or with a failure. - * * <p> * This method may be called at any time including before and after this * Promise has been resolved. + * <p> + * Resolving this Promise <i>happens-before</i> any registered callback is + * called. That is, in a registered callback, {@link #isDone()} must return + * {@code true} and {@link #getValue()} and {@link #getFailure()} must not + * block. + * <p> + * A callback may be called on a different thread than the thread which + * registered the callback. So the callback must be thread safe but can rely + * upon that the registration of the callback <i>happens-before</i> the + * registered callback is called. * + * @param callback The callback to be called when this Promise is resolved. + * Must not be {@code null}. + * @return This Promise. + */ + Promise<T> onResolve(Runnable callback); + + /** + * Register a callback to be called with the result of this Promise when + * this Promise is resolved successfully. The callback will not be called if + * this Promise is resolved with a failure. + * <p> + * This method may be called at any time including before and after this + * Promise has been resolved. * <p> * Resolving this Promise <i>happens-before</i> any registered callback is * called. That is, in a registered callback, {@link #isDone()} must return * {@code true} and {@link #getValue()} and {@link #getFailure()} must not * block. + * <p> + * A callback may be called on a different thread than the thread which + * registered the callback. So the callback must be thread safe but can rely + * upon that the registration of the callback <i>happens-before</i> the + * registered callback is called. * + * @param success The Consumer callback that receives the the value of this + * Promise. Must not be {@code null}. + * @return This Promise. + * @since 1.1 + */ + Promise<T> onSuccess(Consumer< ? super T> success); + + /** + * Register a callback to be called with the failure for this Promise when + * this Promise is resolved with a failure. The callback will not be called + * if this Promise is resolved successfully. + * <p> + * This method may be called at any time including before and after this + * Promise has been resolved. + * <p> + * Resolving this Promise <i>happens-before</i> any registered callback is + * called. That is, in a registered callback, {@link #isDone()} must return + * {@code true} and {@link #getValue()} and {@link #getFailure()} must not + * block. * <p> * A callback may be called on a different thread than the thread which * registered the callback. So the callback must be thread safe but can rely * upon that the registration of the callback <i>happens-before</i> the * registered callback is called. * - * @param callback A callback to be called when this Promise is resolved. - * Must not be {@code null}. + * @param failure The Consumer callback that receives the the failure of + * this Promise. Must not be {@code null}. * @return This Promise. + * @since 1.1 */ - Promise<T> onResolve(Runnable callback); + Promise<T> onFailure(Consumer< ? super Throwable> failure); /** * Chain a new Promise to this Promise with Success and Failure callbacks. - * * <p> * The specified {@link Success} callback is called when this Promise is * successfully resolved and the specified {@link Failure} callback is * called when this Promise is resolved with a failure. - * * <p> * This method returns a new Promise which is chained to this Promise. The * returned Promise must be resolved when this Promise is resolved after the @@ -157,24 +201,20 @@ public interface Promise<T> { * executed callback must be used to resolve the returned Promise. Multiple * calls to this method can be used to create a chain of promises which are * resolved in sequence. - * * <p> * If this Promise is successfully resolved, the Success callback is * executed and the result Promise, if any, or thrown exception is used to * resolve the returned Promise from this method. If this Promise is * resolved with a failure, the Failure callback is executed and the * returned Promise from this method is failed. - * * <p> * This method may be called at any time including before and after this * Promise has been resolved. - * * <p> * Resolving this Promise <i>happens-before</i> any registered callback is * called. That is, in a registered callback, {@link #isDone()} must return * {@code true} and {@link #getValue()} and {@link #getFailure()} must not * block. - * * <p> * A callback may be called on a different thread than the thread which * registered the callback. So the callback must be thread safe but can rely @@ -182,14 +222,14 @@ public interface Promise<T> { * registered callback is called. * * @param <R> The value type associated with the returned Promise. - * @param success A Success callback to be called when this Promise is - * successfully resolved. May be {@code null} if no Success callback - * is required. In this case, the returned Promise must be resolved - * with the value {@code null} when this Promise is successfully - * resolved. - * @param failure A Failure callback to be called when this Promise is - * resolved with a failure. May be {@code null} if no Failure - * callback is required. + * @param success The Success callback to be called when this Promise is + * successfully resolved. May be {@code null} if no Success + * callback is required. In this case, the returned Promise must + * be resolved with the value {@code null} when this Promise is + * successfully resolved. + * @param failure The Failure callback to be called when this Promise is + * resolved with a failure. May be {@code null} if no Failure + * callback is required. * @return A new Promise which is chained to this Promise. The returned * Promise must be resolved when this Promise is resolved after the * specified Success or Failure callback, if any, is executed. @@ -198,18 +238,17 @@ public interface Promise<T> { /** * Chain a new Promise to this Promise with a Success callback. - * * <p> * This method performs the same function as calling * {@link #then(Success, Failure)} with the specified Success callback and * {@code null} for the Failure callback. * * @param <R> The value type associated with the returned Promise. - * @param success A Success callback to be called when this Promise is - * successfully resolved. May be {@code null} if no Success callback - * is required. In this case, the returned Promise must be resolved - * with the value {@code null} when this Promise is successfully - * resolved. + * @param success The Success callback to be called when this Promise is + * successfully resolved. May be {@code null} if no Success + * callback is required. In this case, the returned Promise must + * be resolved with the value {@code null} when this Promise is + * successfully resolved. * @return A new Promise which is chained to this Promise. The returned * Promise must be resolved when this Promise is resolved after the * specified Success, if any, is executed. @@ -218,16 +257,17 @@ public interface Promise<T> { <R> Promise<R> then(Success<? super T, ? extends R> success); /** - * Chain a new Promise to this Promise with a callback. + * Chain a new Promise to this Promise with a Consumer callback that + * receives the value of this Promise when it is successfully resolved. * <p> - * The specified {@link Callback} is called when this Promise is resolved - * either successfully or with a failure. + * The specified {@link Consumer} is called when this Promise is resolved + * successfully. * <p> * This method returns a new Promise which is chained to this Promise. The * returned Promise must be resolved when this Promise is resolved after the * specified callback is executed. If the callback throws an exception, the * returned Promise is failed with that exception. Otherwise the returned - * Promise is resolved with this Promise. + * Promise is resolved with the success value from this Promise. * <p> * This method may be called at any time including before and after this * Promise has been resolved. @@ -242,18 +282,17 @@ public interface Promise<T> { * upon that the registration of the callback <i>happens-before</i> the * registered callback is called. * - * @param callback A callback to be called when this Promise is resolved. - * Must not be {@code null}. + * @param consumer The Consumer callback that receives the the value of this + * Promise. Must not be {@code null}. * @return A new Promise which is chained to this Promise. The returned * Promise must be resolved when this Promise is resolved after the - * specified callback is executed. + * specified Consumer is executed. * @since 1.1 */ - Promise<T> then(Callback callback); + Promise<T> thenAccept(Consumer< ? super T> consumer); /** * Filter the value of this Promise. - * * <p> * If this Promise is successfully resolved, the returned Promise must * either be resolved with the value of this Promise, if the specified @@ -261,17 +300,15 @@ public interface Promise<T> { * {@code NoSuchElementException}, if the specified Predicate does not * accept that value. If the specified Predicate throws an exception, the * returned Promise must be failed with the exception. - * * <p> * If this Promise is resolved with a failure, the returned Promise must be * failed with that failure. - * * <p> * This method may be called at any time including before and after this * Promise has been resolved. * * @param predicate The Predicate to evaluate the value of this Promise. - * Must not be {@code null}. + * Must not be {@code null}. * @return A Promise that filters the value of this Promise. */ Promise<T> filter(Predicate<? super T> predicate); diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java new file mode 100644 index 000000000..488ca7bac --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java @@ -0,0 +1,383 @@ +/* + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +import static java.util.Objects.requireNonNull; +import static org.osgi.util.promise.PromiseImpl.uncaughtException; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.osgi.annotation.versioning.ConsumerType; + +/** + * The executors for Promise callbacks and scheduled operations. + * <p> + * Instances of this class can be used to create a Deferred that can be resolved + * in the future as well as resolved Promises. The returned Deferred and Promise + * objects all use the executors used to construct this object for any callback + * or scheduled operation execution. + * + * @Immutable + * @author $Id$ + * @since 1.1 + */ +@ConsumerType +public class PromiseExecutors { + /** + * The default executors. + */ + final static PromiseExecutors defaultExecutors = new PromiseExecutors( + null, null); + + /** + * The executor to use for callbacks. If {@code null}, the default + * callback executor is used. + */ + private final Executor callbackExecutor; + /** + * The executor to use for scheduled operations. If {@code null}, the + * default scheduled executor is used. + */ + private final ScheduledExecutorService scheduledExecutor; + + + /** + * Create a new PromiseExecutors with the specified callback executor. + * <p> + * The default scheduled executor will be used. + * + * @param callbackExecutor The executor to use for callbacks. {@code null} + * can be specified for the default callback executor. + */ + public PromiseExecutors(Executor callbackExecutor) { + this(callbackExecutor, null); + } + + /** + * Create a new PromiseExecutors with the specified callback executor and + * specified scheduled executor. + * + * @param callbackExecutor The executor to use for callbacks. {@code null} + * can be specified for the default callback executor. + * @param scheduledExecutor The scheduled executor for use for scheduled + * operations. {@code null} can be specified for the default + * scheduled executor. + */ + public PromiseExecutors(Executor callbackExecutor, + ScheduledExecutorService scheduledExecutor) { + this.callbackExecutor = callbackExecutor; + this.scheduledExecutor = scheduledExecutor; + } + + /** + * Returns the executor to use for callbacks. + * + * @return The executor to use for callbacks. This will be the default + * callback executor if {@code null} was specified for the callback + * executor when this PromiseExecutors was created. + */ + protected Executor executor() { + if (callbackExecutor == null) { + return DefaultExecutors.callbackExecutor(); + } + return callbackExecutor; + } + + /** + * Returns the executor to use for scheduled operations. + * + * @return The executor to use for scheduled operations. This will be the + * default scheduled executor if {@code null} was specified for the + * scheduled executor when this PromiseExecutors was created. + */ + protected ScheduledExecutorService scheduledExecutor() { + if (scheduledExecutor == null) { + return DefaultExecutors.scheduledExecutor(); + } + return scheduledExecutor; + } + + /** + * Create a new Deferred with the callback executor and scheduled executor + * of this PromiseExecutors object. + * <p> + * Use this method instead of {@link Deferred#Deferred()} to create a new + * {@link Deferred} whose associated Promise uses executors other than the + * default executors. + * + * @return A new {@link Deferred} with the callback and scheduled executors + * of this PromiseExecutors object + */ + public <T> Deferred<T> deferred() { + return new Deferred<>(this); + } + + /** + * Returns a new Promise that has been resolved with the specified value. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseExecutors object + * <p> + * Use this method instead of {@link Promises#resolved(Object)} to create a + * Promise which uses executors other than the default executors. + * + * @param <T> The value type associated with the returned Promise. + * @param value The value of the resolved Promise. + * @return A new Promise that has been resolved with the specified value. + */ + public <T> Promise<T> resolved(T value) { + return new PromiseImpl<>(value, null, this); + } + + /** + * Returns a new Promise that has been resolved with the specified failure. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseExecutors object + * <p> + * Use this method instead of {@link Promises#failed(Throwable)} to create a + * Promise which uses executors other than the default executors. + * + * @param <T> The value type associated with the returned Promise. + * @param failure The failure of the resolved Promise. Must not be + * {@code null}. + * @return A new Promise that has been resolved with the specified failure. + */ + public <T> Promise<T> failed(Throwable failure) { + return new PromiseImpl<>(null, requireNonNull(failure), this); + } + + /** + * Returns a new Promise that will hold the result of the specified task. + * <p> + * The specified task will be executed on the {@link #executor() callback + * executor}. + * + * @param task The task whose result will be available from the returned + * Promise. + * @return A new Promise that will hold the result of the specified task. + */ + public <V> Promise<V> submit(final Callable< ? extends V> task) { + requireNonNull(task); + final Deferred<V> deferred = deferred(); + try { + executor().execute(new Runnable() { + @Override + public void run() { + try { + deferred.resolve(task.call()); + } catch (Throwable t) { + deferred.fail(t); + } + } + }); + } catch (Exception t) { + deferred.fail(t); + } + return deferred.getPromise(); + } + + /** + * Returns an Executor implementation that executes tasks immediately on the + * thread calling the {@code Executor.execute} method. + * + * @return An Executor implementation that executes tasks immediately on the + * thread calling the {@code Executor.execute} method. + */ + public static Executor inlineExecutor() { + return new InlineExecutor(); + } + + /** + * An Executor implementation which executes the task immediately on the + * thread calling the {@code Executor.execute} method. + * + * @Immutable + */ + private static final class InlineExecutor implements Executor { + InlineExecutor() {} + + @Override + public void execute(Runnable callback) { + callback.run(); + } + } + + /** + * Default executors for Promises. + * + * @Immutable + */ + private static final class DefaultExecutors + implements ThreadFactory, RejectedExecutionHandler, Runnable { + private static final DefaultExecutors callbacks; + private static final ScheduledExecutor scheduledExecutor; + private static final ThreadPoolExecutor callbackExecutor; + static { + callbacks = new DefaultExecutors(); + scheduledExecutor = new ScheduledExecutor(2, callbacks); + callbackExecutor = new ThreadPoolExecutor(0, 64, 60L, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + callbacks, callbacks); + } + + static Executor callbackExecutor() { + return callbackExecutor; + } + + static ScheduledExecutorService scheduledExecutor() { + return scheduledExecutor; + } + + private final AtomicBoolean shutdownHookInstalled; + private final ThreadFactory delegateThreadFactory; + + private DefaultExecutors() { + shutdownHookInstalled = new AtomicBoolean(); + delegateThreadFactory = Executors.defaultThreadFactory(); + } + + /** + * Executor threads should not prevent VM from exiting + */ + @Override + public Thread newThread(Runnable r) { + if (shutdownHookInstalled.compareAndSet(false, true)) { + Thread shutdownThread = delegateThreadFactory.newThread(this); + shutdownThread.setName( + "ExecutorShutdownHook," + shutdownThread.getName()); + try { + Runtime.getRuntime().addShutdownHook(shutdownThread); + } catch (IllegalStateException e) { + // VM is already shutting down... + callbackExecutor.shutdown(); + scheduledExecutor.shutdown(); + } + } + Thread t = delegateThreadFactory.newThread(r); + t.setName("PromiseImpl," + t.getName()); + t.setDaemon(true); + return t; + } + + /** + * Call the callback using the caller's thread because the thread pool + * rejected the execution. + */ + @Override + public void rejectedExecution(Runnable callback, + ThreadPoolExecutor executor) { + try { + callback.run(); + } catch (Throwable t) { + uncaughtException(t); + } + } + + /** + * Shutdown hook + */ + @Override + public void run() { + // limit new thread creation + callbackExecutor.setMaximumPoolSize( + Math.max(1, callbackExecutor.getPoolSize())); + // Run all delayed callbacks now + scheduledExecutor.shutdown(); + BlockingQueue<Runnable> queue = scheduledExecutor.getQueue(); + if (!queue.isEmpty()) { + for (Object r : queue.toArray()) { + if (r instanceof RunnableScheduledFuture< ? >) { + RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r; + if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L) + && queue.remove(future)) { + future.run(); + scheduledExecutor.afterExecute(future, null); + } + } + } + scheduledExecutor.shutdown(); + } + try { + scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Shutdown callback executor + callbackExecutor.shutdown(); + try { + callbackExecutor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * ScheduledThreadPoolExecutor for scheduled execution. + * + * @ThreadSafe + */ + private static final class ScheduledExecutor + extends ScheduledThreadPoolExecutor { + ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + /** + * Handle uncaught exceptions + */ + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if ((t == null) && (r instanceof Future< ? >)) { + boolean interrupted = Thread.interrupted(); + try { + ((Future< ? >) r).get(); + } catch (CancellationException e) { + // ignore + } catch (InterruptedException e) { + interrupted = true; + } catch (ExecutionException e) { + t = e.getCause(); + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + } + if (t != null) { + uncaughtException(t); + } + } + } + } +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java index 1a65fc8ef..5d02ff653 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java @@ -20,27 +20,13 @@ import static java.util.Objects.requireNonNull; import java.lang.reflect.InvocationTargetException; import java.util.NoSuchElementException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.RunnableScheduledFuture; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.osgi.util.function.Callback; +import org.osgi.util.function.Consumer; import org.osgi.util.function.Function; import org.osgi.util.function.Predicate; @@ -56,15 +42,9 @@ import org.osgi.util.function.Predicate; */ final class PromiseImpl<T> implements Promise<T> { /** - * The executor to use for callbacks. If {@code null}, the default callback - * executor is used. + * The executors to use for callbacks and scheduled operations. */ - private final Executor callbackExecutor; - /** - * The executor to use for scheduled operations. If {@code null}, the - * default scheduled executor is used. - */ - private final ScheduledExecutorService scheduledExecutor; + private final PromiseExecutors executors; /** * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no * additional synchronization is required to write to or read from the @@ -104,16 +84,11 @@ final class PromiseImpl<T> implements Promise<T> { /** * Initialize this Promise. * - * @param callbackExecutor The executor to use for callbacks. {@code null} - * can be specified for the default callback executor. - * @param scheduledExecutor The scheduled executor for use for scheduled - * operations. {@code null} can be specified for the default - * scheduled executor. + * @param executors The executors to use for callbacks and scheduled + * operations. */ - PromiseImpl(Executor callbackExecutor, - ScheduledExecutorService scheduledExecutor) { - this.callbackExecutor = callbackExecutor; - this.scheduledExecutor = scheduledExecutor; + PromiseImpl(PromiseExecutors executors) { + this.executors = executors; callbacks = new ConcurrentLinkedQueue<>(); resolved = new CountDownLatch(1); } @@ -123,21 +98,16 @@ final class PromiseImpl<T> implements Promise<T> { * * @param v The value of this resolved Promise. * @param f The failure of this resolved Promise. - * @param callbackExecutor The executor to use for callbacks. {@code null} - * can be specified for the default callback executor. - * @param scheduledExecutor The scheduled executor for use for scheduled - * operations. {@code null} can be specified for the default - * scheduled executor. + * @param executors The executors to use for callbacks and scheduled + * operations. */ - PromiseImpl(T v, Throwable f, Executor callbackExecutor, - ScheduledExecutorService scheduledExecutor) { + PromiseImpl(T v, Throwable f, PromiseExecutors executors) { if (f == null) { value = v; } else { fail = f; } - this.callbackExecutor = callbackExecutor; - this.scheduledExecutor = scheduledExecutor; + this.executors = executors; callbacks = new ConcurrentLinkedQueue<>(); resolved = new CountDownLatch(0); } @@ -226,23 +196,13 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public String toString() { - if (!isDone()) { + if (!isDone()) { // ensure latch open before reading state return super.toString() + "[unresolved]"; } - final boolean interrupted = Thread.interrupted(); - try { - Throwable t = getFailure(); - if (t != null) { - return super.toString() + "[failed: " + t + "]"; - } - return super.toString() + "[resolved: " + getValue() + "]"; - } catch (InterruptedException | InvocationTargetException e) { - return super.toString() + "[" + e + "]"; - } finally { - if (interrupted) { // restore interrupt status - Thread.currentThread().interrupt(); - } + if (fail == null) { + return super.toString() + "[resolved: " + value + "]"; } + return super.toString() + "[failed: " + fail + "]"; } /** @@ -262,20 +222,15 @@ final class PromiseImpl<T> implements Promise<T> { if (resolved.getCount() != 0) { return; // return if not resolved } - /* * Note: multiple threads can be in this method removing callbacks from * the queue and executing them, so the order in which callbacks are * executed cannot be specified. */ - Executor executor = callbackExecutor; for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { - if (executor == null) { - executor = DefaultExecutors.callbackExecutor(); - } try { try { - executor.execute(callback); + executors.executor().execute(callback); } catch (RejectedExecutionException e) { callback.run(); } @@ -302,13 +257,38 @@ final class PromiseImpl<T> implements Promise<T> { /** * {@inheritDoc} + * + * @since 1.1 */ @Override - public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { - PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); - onResolve(chained.new Then<>(this, success, failure)); - return chained; + public Promise<T> onSuccess(Consumer< ? super T> success) { + return onResolve(new OnSuccess(success)); + } + + /** + * A callback used for the {@link #onSuccess(Consumer)} method. + * + * @Immutable + * @since 1.1 + */ + private final class OnSuccess implements Runnable { + private final Consumer< ? super T> success; + + OnSuccess(Consumer< ? super T> success) { + this.success = requireNonNull(success); + } + + @Override + public void run() { + Result<T> result = collect(); + if (result.fail == null) { + try { + success.accept(result.value); + } catch (Throwable e) { + uncaughtException(e); + } + } + } } /** @@ -317,34 +297,67 @@ final class PromiseImpl<T> implements Promise<T> { * @since 1.1 */ @Override - public <R> Promise<R> then(Success<? super T, ? extends R> success) { - return then(success, null); + public Promise<T> onFailure(Consumer< ? super Throwable> failure) { + return onResolve(new OnFailure(failure)); + } + + /** + * A callback used for the {@link #onFailure(Consumer)} method. + * + * @Immutable + * @since 1.1 + */ + private final class OnFailure implements Runnable { + private final Consumer< ? super Throwable> failure; + + OnFailure(Consumer< ? super Throwable> failure) { + this.failure = requireNonNull(failure); + } + + @Override + public void run() { + Result<T> result = collect(); + if (result.fail != null) { + try { + failure.accept(result.fail); + } catch (Throwable e) { + uncaughtException(e); + } + } + } } /** * {@inheritDoc} */ @Override - public Promise<T> then(Callback callback) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); - onResolve(chained.new Chain(this, callback)); + public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { + PromiseImpl<R> chained = new PromiseImpl<>(executors); + onResolve(chained.new Then<>(this, success, failure)); return chained; } /** + * {@inheritDoc} + */ + @Override + public <R> Promise<R> then(Success<? super T, ? extends R> success) { + return then(success, null); + } + + /** * A callback used to chain promises for the {@link #then(Success, Failure)} * method. * * @Immutable */ private final class Then<P> implements Runnable { - private final Promise<P> promise; + private final PromiseImpl<P> promise; private final Success<P, ? extends T> success; private final Failure failure; @SuppressWarnings("unchecked") - Then(Promise<P> promise, Success< ? super P, ? extends T> success, + Then(PromiseImpl<P> promise, Success< ? super P, ? extends T> success, Failure failure) { this.promise = promise; this.success = (Success<P, ? extends T>) success; @@ -353,13 +366,13 @@ final class PromiseImpl<T> implements Promise<T> { @Override public void run() { - Throwable f = Result.collect(promise).fail; - if (f != null) { + Result<P> result = promise.collect(); + if (result.fail != null) { if (failure != null) { try { failure.fail(promise); } catch (Throwable e) { - f = e; // propagate new exception + result.fail = e; // propagate new exception } } } else if (success != null) { @@ -367,7 +380,7 @@ final class PromiseImpl<T> implements Promise<T> { try { returned = success.call(promise); } catch (Throwable e) { - f = e; // propagate new exception + result.fail = e; // propagate new exception } if (returned != null) { // resolve chained when returned promise is resolved @@ -375,60 +388,95 @@ final class PromiseImpl<T> implements Promise<T> { return; } } - tryResolve(null, f); + tryResolve(null, result.fail); } } /** - * A callback used to resolve the chained Promise when the Promise promise - * is resolved. + * A callback used to resolve the chained Promise when the Promise is + * resolved. * * @Immutable */ private final class Chain implements Runnable { - private final Promise< ? extends T> promise; - private final Throwable failure; - private final Callback callback; - + private final Promise< ? extends T> promise; + Chain(Promise< ? extends T> promise) { this.promise = promise; - this.failure = null; - this.callback = null; } + + @Override + public void run() { + Result<T> result = collect(promise); + tryResolve(result.value, result.fail); + } + } - Chain(Promise< ? extends T> promise, Throwable failure) { + /** + * A callback used to resolve the chained Promise when the PromiseImpl is + * resolved. + * + * @Immutable + * @since 1.1 + */ + private final class ChainImpl implements Runnable { + private final PromiseImpl<T> promise; + + ChainImpl(PromiseImpl<T> promise) { this.promise = promise; - this.failure = requireNonNull(failure); - this.callback = null; } + + @Override + public void run() { + Result<T> result = promise.collect(); + tryResolve(result.value, result.fail); + } + } + + /** + * {@inheritDoc} + * + * @since 1.1 + */ + @Override + public Promise<T> thenAccept(Consumer< ? super T> consumer) { + PromiseImpl<T> chained = new PromiseImpl<>(executors); + onResolve(chained.new ThenAccept(this, consumer)); + return chained; + } - Chain(Promise< ? extends T> promise, Callback callback) { + /** + * A callback used to resolve the chained Promise and call a Consumer when + * the PromiseImpl is resolved. + * + * @Immutable + * @since 1.1 + */ + private final class ThenAccept implements Runnable { + private final PromiseImpl<T> promise; + private final Consumer< ? super T> consumer; + + ThenAccept(PromiseImpl<T> promise, Consumer< ? super T> consumer) { this.promise = promise; - this.failure = null; - this.callback = requireNonNull(callback); + this.consumer = requireNonNull(consumer); } - + @Override public void run() { - if (callback != null) { + Result<T> result = promise.collect(); + if (result.fail == null) { try { - callback.run(); + consumer.accept(result.value); } catch (Throwable e) { - tryResolve(null, e); - return; + result.fail = e; } } - Result<T> result = Result.collect(promise); - if ((result.fail != null) && (failure != null)) { - result.fail = failure; - } tryResolve(result.value, result.fail); } } /** * Resolve this Promise with the specified Promise. - * * <p> * If the specified Promise is successfully resolved, this Promise is * resolved with the value of the specified Promise. If the specified @@ -436,7 +484,7 @@ final class PromiseImpl<T> implements Promise<T> { * failure of the specified Promise. * * @param with A Promise whose value or failure must be used to resolve this - * Promise. Must not be {@code null}. + * Promise. Must not be {@code null}. * @return A Promise that is resolved only when this Promise is resolved by * the specified Promise. The returned Promise must be successfully * resolved with the value {@code null}, if this Promise was @@ -446,37 +494,36 @@ final class PromiseImpl<T> implements Promise<T> { * resolved. */ Promise<Void> resolveWith(Promise<? extends T> with) { - PromiseImpl<Void> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); - with.onResolve(new ResolveWith(with, chained)); + PromiseImpl<Void> chained = new PromiseImpl<>(executors); + with.onResolve(chained.new ResolveWith<>(this, with)); return chained; } /** - * A callback used to resolve this Promise with another Promise for the + * A callback used to resolve a Promise with another Promise for the * {@link PromiseImpl#resolveWith(Promise)} method. * * @Immutable */ - private final class ResolveWith implements Runnable { - private final Promise< ? extends T> promise; - private final PromiseImpl<Void> chained; + private final class ResolveWith<P> implements Runnable { + private final PromiseImpl<P> promise; + private final Promise< ? extends P> with; - ResolveWith(Promise< ? extends T> promise, PromiseImpl<Void> chained) { + ResolveWith(PromiseImpl<P> promise, Promise< ? extends P> with) { this.promise = promise; - this.chained = chained; + this.with = requireNonNull(with); } @Override public void run() { Throwable f = null; - Result<T> result = Result.collect(promise); + Result<P> result = collect(with); try { - resolve(result.value, result.fail); + promise.resolve(result.value, result.fail); } catch (Throwable e) { f = e; // propagate new exception } - chained.tryResolve(null, f); + tryResolve(null, f); } } @@ -485,8 +532,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> filter(Predicate<? super T> predicate) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<T> chained = new PromiseImpl<>(executors); onResolve(chained.new Filter(this, predicate)); return chained; } @@ -497,17 +543,17 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable */ private final class Filter implements Runnable { - private final Promise< ? extends T> promise; + private final PromiseImpl<T> promise; private final Predicate<? super T> predicate; - Filter(Promise< ? extends T> promise, Predicate< ? super T> predicate) { + Filter(PromiseImpl<T> promise, Predicate< ? super T> predicate) { this.promise = promise; this.predicate = requireNonNull(predicate); } @Override public void run() { - Result<T> result = Result.collect(promise); + Result<T> result = promise.collect(); if (result.fail == null) { try { if (!predicate.test(result.value)) { @@ -526,8 +572,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { - PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<R> chained = new PromiseImpl<>(executors); onResolve(chained.new Map<>(this, mapper)); return chained; } @@ -538,10 +583,10 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable */ private final class Map<P> implements Runnable { - private final Promise< ? extends P> promise; + private final PromiseImpl<P> promise; private final Function<? super P, ? extends T> mapper; - Map(Promise< ? extends P> promise, + Map(PromiseImpl<P> promise, Function< ? super P, ? extends T> mapper) { this.promise = promise; this.mapper = requireNonNull(mapper); @@ -549,7 +594,7 @@ final class PromiseImpl<T> implements Promise<T> { @Override public void run() { - Result<P> result = Result.collect(promise); + Result<P> result = promise.collect(); T v = null; if (result.fail == null) { try { @@ -567,8 +612,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { - PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<R> chained = new PromiseImpl<>(executors); onResolve(chained.new FlatMap<>(this, mapper)); return chained; } @@ -579,10 +623,10 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable */ private final class FlatMap<P> implements Runnable { - private final Promise< ? extends P> promise; + private final PromiseImpl<P> promise; private final Function< ? super P,Promise< ? extends T>> mapper; - FlatMap(Promise< ? extends P> promise, + FlatMap(PromiseImpl<P> promise, Function< ? super P,Promise< ? extends T>> mapper) { this.promise = promise; this.mapper = requireNonNull(mapper); @@ -590,7 +634,7 @@ final class PromiseImpl<T> implements Promise<T> { @Override public void run() { - Result<P> result = Result.collect(promise); + Result<P> result = promise.collect(); if (result.fail == null) { Promise< ? extends T> flatmap = null; try { @@ -612,8 +656,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<T> chained = new PromiseImpl<>(executors); onResolve(chained.new Recover(this, recovery)); return chained; } @@ -624,10 +667,10 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable */ private final class Recover implements Runnable { - private final Promise<T> promise; + private final PromiseImpl<T> promise; private final Function<Promise< ? >, ? extends T> recovery; - Recover(Promise<T> promise, + Recover(PromiseImpl<T> promise, Function<Promise< ? >, ? extends T> recovery) { this.promise = promise; this.recovery = requireNonNull(recovery); @@ -635,7 +678,7 @@ final class PromiseImpl<T> implements Promise<T> { @Override public void run() { - Result<T> result = Result.collect(promise); + Result<T> result = promise.collect(); if (result.fail != null) { try { T v = recovery.apply(promise); @@ -656,8 +699,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<T> chained = new PromiseImpl<>(executors); onResolve(chained.new RecoverWith(this, recovery)); return chained; } @@ -668,10 +710,10 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable */ private final class RecoverWith implements Runnable { - private final Promise<T> promise; + private final PromiseImpl<T> promise; private final Function<Promise<?>, Promise<? extends T>> recovery; - RecoverWith(Promise<T> promise, + RecoverWith(PromiseImpl<T> promise, Function<Promise< ? >,Promise< ? extends T>> recovery) { this.promise = promise; this.recovery = requireNonNull(recovery); @@ -679,7 +721,7 @@ final class PromiseImpl<T> implements Promise<T> { @Override public void run() { - Result<T> result = Result.collect(promise); + Result<T> result = promise.collect(); if (result.fail != null) { Promise< ? extends T> recovered = null; try { @@ -701,8 +743,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> fallbackTo(Promise<? extends T> fallback) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<T> chained = new PromiseImpl<>(executors); onResolve(chained.new FallbackTo(this, fallback)); return chained; } @@ -713,21 +754,47 @@ final class PromiseImpl<T> implements Promise<T> { * @Immutable */ private final class FallbackTo implements Runnable { - private final Promise<T> promise; + private final PromiseImpl<T> promise; private final Promise<? extends T> fallback; - FallbackTo(Promise<T> promise, Promise< ? extends T> fallback) { + FallbackTo(PromiseImpl<T> promise, Promise< ? extends T> fallback) { this.promise = promise; this.fallback = requireNonNull(fallback); } @Override public void run() { - Result<T> result = Result.collect(promise); + Result<T> result = promise.collect(); if (result.fail != null) { - fallback.onResolve(new Chain(fallback, result.fail)); + fallback.onResolve(new FallbackChain(fallback, result.fail)); return; } + tryResolve(result.value, null); + } + } + + /** + * A callback used to resolve the chained Promise when the fallback Promise + * is resolved. + * + * @Immutable + * @since 1.1 + */ + private final class FallbackChain implements Runnable { + private final Promise< ? extends T> fallback; + private final Throwable failure; + + FallbackChain(Promise< ? extends T> fallback, Throwable failure) { + this.fallback = fallback; + this.failure = failure; + } + + @Override + public void run() { + Result<T> result = collect(fallback); + if (result.fail != null) { + result.fail = failure; + } tryResolve(result.value, result.fail); } } @@ -739,13 +806,10 @@ final class PromiseImpl<T> implements Promise<T> { */ ScheduledFuture< ? > schedule(Runnable operation, long delay, TimeUnit unit) { - ScheduledExecutorService executor = scheduledExecutor; - if (executor == null) { - executor = DefaultExecutors.scheduledExecutor(); - } try { try { - return executor.schedule(operation, delay, unit); + return executors.scheduledExecutor().schedule(operation, delay, + unit); } catch (RejectedExecutionException e) { operation.run(); } @@ -762,12 +826,14 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> timeout(long millis) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); + PromiseImpl<T> chained = new PromiseImpl<>(executors); if (!isDone()) { - onResolve(chained.new Timeout(millis, TimeUnit.MILLISECONDS)); + PromiseImpl<T> timedout = new PromiseImpl<>(null, + new TimeoutException(), executors); + onResolve(new Timeout(chained.new ChainImpl(timedout), millis, + TimeUnit.MILLISECONDS)); } - onResolve(chained.new Chain(this)); + onResolve(chained.new ChainImpl(this)); return chained; } @@ -781,8 +847,8 @@ final class PromiseImpl<T> implements Promise<T> { private final class Timeout implements Runnable { private final ScheduledFuture< ? > future; - Timeout(long timeout, TimeUnit unit) { - future = schedule(new TimeoutAction(), timeout, unit); + Timeout(Runnable operation, long delay, TimeUnit unit) { + future = schedule(operation, delay, unit); } @Override @@ -794,30 +860,14 @@ final class PromiseImpl<T> implements Promise<T> { } /** - * Callback used to fail the Promise if the timeout expires. - * - * @Immutable - * @since 1.1 - */ - private final class TimeoutAction implements Runnable { - TimeoutAction() {} - - @Override - public void run() { - tryResolve(null, new TimeoutException()); - } - } - - /** * {@inheritDoc} * * @since 1.1 */ @Override public Promise<T> delay(long millis) { - PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor, - scheduledExecutor); - onResolve(new Delay(chained.new Chain(this), millis, + PromiseImpl<T> chained = new PromiseImpl<>(executors); + onResolve(new Delay(chained.new ChainImpl(this), millis, TimeUnit.MILLISECONDS)); return chained; } @@ -847,183 +897,68 @@ final class PromiseImpl<T> implements Promise<T> { } /** - * Default executors for callbacks. + * A holder of the result of a Promise. * - * @Immutable + * @NotThreadSafe * @since 1.1 */ - private static final class DefaultExecutors - implements ThreadFactory, RejectedExecutionHandler, Runnable { - private static final DefaultExecutors callbacks; - private static final ScheduledExecutor scheduledExecutor; - private static final ThreadPoolExecutor callbackExecutor; - static { - callbacks = new DefaultExecutors(); - scheduledExecutor = new ScheduledExecutor(2, callbacks); - callbackExecutor = new ThreadPoolExecutor(0, 64, 60L, - TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), - callbacks, callbacks); - } - - static Executor callbackExecutor() { - return callbackExecutor; - } - - static ScheduledExecutorService scheduledExecutor() { - return scheduledExecutor; - } - - private final AtomicBoolean shutdownHookInstalled; - private final ThreadFactory delegateThreadFactory; - - private DefaultExecutors() { - shutdownHookInstalled = new AtomicBoolean(); - delegateThreadFactory = Executors.defaultThreadFactory(); - } - - /** - * Executor threads should not prevent VM from exiting - */ - @Override - public Thread newThread(Runnable r) { - if (shutdownHookInstalled.compareAndSet(false, true)) { - Thread shutdownThread = delegateThreadFactory.newThread(this); - shutdownThread.setName("ExecutorShutdownHook," - + shutdownThread.getName()); - try { - Runtime.getRuntime().addShutdownHook(shutdownThread); - } catch (IllegalStateException e) { - // VM is already shutting down... - callbackExecutor.shutdown(); - scheduledExecutor.shutdown(); - } - } - Thread t = delegateThreadFactory.newThread(r); - t.setName("PromiseImpl," + t.getName()); - t.setDaemon(true); - return t; + static final class Result<P> { + Throwable fail; + P value; + + Result(P value) { + this.value = value; + this.fail = null; } - - /** - * Call the callback using the caller's thread because the thread pool - * rejected the execution. - */ - @Override - public void rejectedExecution(Runnable callback, - ThreadPoolExecutor executor) { - try { - callback.run(); - } catch (Throwable t) { - uncaughtException(t); - } + + Result(Throwable fail) { + this.value = null; + this.fail = fail; } + } - /** - * Shutdown hook - */ - @Override - public void run() { - // limit new thread creation - callbackExecutor.setMaximumPoolSize( - Math.max(1, callbackExecutor.getPoolSize())); - // Run all delayed callbacks now - scheduledExecutor.shutdown(); - BlockingQueue<Runnable> queue = scheduledExecutor.getQueue(); - if (!queue.isEmpty()) { - for (Object r : queue.toArray()) { - if (r instanceof RunnableScheduledFuture< ? >) { - RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r; - if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L) - && queue.remove(future)) { - future.run(); - scheduledExecutor.afterExecute(future, null); - } - } - } - scheduledExecutor.shutdown(); - } - try { - scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - // Shutdown callback executor - callbackExecutor.shutdown(); - try { - callbackExecutor.awaitTermination(20, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + /** + * Return a holder of the result of this PromiseImpl. + * + * @since 1.1 + */ + Result<T> collect() { + if (!isDone()) { // ensure latch open before reading state + return new Result<T>(new AssertionError("promise not resolved")); } - - /** - * ScheduledThreadPoolExecutor for scheduled execution. - * - * @ThreadSafe - */ - private static final class ScheduledExecutor - extends ScheduledThreadPoolExecutor { - ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); - } - - /** - * Handle uncaught exceptions - */ - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - if ((t == null) && (r instanceof Future< ? >)) { - boolean interrupted = Thread.interrupted(); - try { - ((Future< ? >) r).get(); - } catch (CancellationException e) { - // ignore - } catch (InterruptedException e) { - interrupted = true; - } catch (ExecutionException e) { - t = e.getCause(); - } finally { - if (interrupted) { // restore interrupt status - Thread.currentThread().interrupt(); - } - } - } - if (t != null) { - uncaughtException(t); - } - } + if (fail == null) { + return new Result<T>(value); } + return new Result<T>(fail); } /** - * A holder of the result of a promise. + * Return a holder of the result of the specified Promise. * - * @NotThreadSafe * @since 1.1 */ - static final class Result<P> { - Throwable fail; - P value; - - Result() {} - - static <R> Result<R> collect(Promise< ? extends R> promise) { - Result<R> result = new Result<>(); - final boolean interrupted = Thread.interrupted(); - try { - result.fail = promise.getFailure(); - if (result.fail == null) { - result.value = promise.getValue(); - } - } catch (Throwable e) { - result.fail = e; // propagate new exception - } finally { - if (interrupted) { // restore interrupt status - Thread.currentThread().interrupt(); - } + static <R> Result<R> collect(Promise< ? extends R> promise) { + if (promise instanceof PromiseImpl) { + @SuppressWarnings("unchecked") + PromiseImpl<R> impl = (PromiseImpl<R>) promise; + return impl.collect(); + } + if (!promise.isDone()) { + return new Result<R>(new AssertionError("promise not resolved")); + } + final boolean interrupted = Thread.interrupted(); + try { + Throwable fail = promise.getFailure(); + if (fail == null) { + return new Result<R>(promise.getValue()); + } + return new Result<R>(fail); + } catch (Throwable e) { + return new Result<R>(e); // propagate new exception + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); } - return result; } } } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java index 394bce4bb..a25bdf3c5 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java @@ -16,7 +16,7 @@ package org.osgi.util.promise; -import static java.util.Objects.requireNonNull; +import static org.osgi.util.promise.PromiseExecutors.defaultExecutors; import java.util.ArrayList; import java.util.Arrays; @@ -42,10 +42,13 @@ public class Promises { * * @param <T> The value type associated with the returned Promise. * @param value The value of the resolved Promise. - * @return A new Promise that has been resolved with the specified value. + * @return A new Promise which uses the default callback executor and + * default scheduled executor that has been resolved with the + * specified value. + * @see PromiseExecutors#resolved(Object) */ public static <T> Promise<T> resolved(T value) { - return new PromiseImpl<>(value, null, null, null); + return defaultExecutors.resolved(value); } /** @@ -54,10 +57,13 @@ public class Promises { * @param <T> The value type associated with the returned Promise. * @param failure The failure of the resolved Promise. Must not be * {@code null}. - * @return A new Promise that has been resolved with the specified failure. + * @return A new Promise which uses the default callback executor and + * default scheduled executor that has been resolved with the + * specified failure. + * @see PromiseExecutors#failed(Throwable) */ public static <T> Promise<T> failed(Throwable failure) { - return new PromiseImpl<>(null, requireNonNull(failure), null, null); + return defaultExecutors.failed(failure); } /** @@ -74,19 +80,21 @@ public class Promises { * @param promises The Promises which must be resolved before the returned * Promise must be resolved. Must not be {@code null} and all of * the elements in the collection must not be {@code null}. - * @return A Promise that is resolved only when all the specified Promises - * are resolved. The returned Promise must be successfully resolved - * with a List of the values in the order of the specified Promises - * if all the specified Promises are successfully resolved. The List - * in the returned Promise is the property of the caller and is - * modifiable. The returned Promise must be resolved with a failure - * of {@link FailedPromisesException} if any of the specified - * Promises are resolved with a failure. The failure + * @return A Promise which uses the default callback executor and default + * scheduled executor that is resolved only when all the specified + * Promises are resolved. The returned Promise must be successfully + * resolved with a List of the values in the order of the specified + * Promises if all the specified Promises are successfully resolved. + * The List in the returned Promise is the property of the caller + * and is modifiable. The returned Promise must be resolved with a + * failure of {@link FailedPromisesException} if any of the + * specified Promises are resolved with a failure. The failure * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. + * @see #all(Deferred, Collection) */ public static <T, S extends T> Promise<List<T>> all(Collection<Promise<S>> promises) { - return all(new Deferred<List<T>>(), promises); + return all(defaultExecutors.<List<T>> deferred(), promises); } /** @@ -118,6 +126,7 @@ public class Promises { * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. * @since 1.1 + * @see PromiseExecutors#deferred() */ public static <T, S extends T> Promise<List<T>> all( Deferred<List<T>> deferred, Collection<Promise<S>> promises) { @@ -147,7 +156,8 @@ public class Promises { * @param promises The Promises which must be resolved before the returned * Promise must be resolved. Must not be {@code null} and all of * the arguments must not be {@code null}. - * @return A Promise that is resolved only when all the specified Promises + * @return A Promise which uses the default callback executor and scheduled + * executor that is resolved only when all the specified Promises * are resolved. The returned Promise must be successfully resolved * with a List of the values in the order of the specified Promises * if all the specified Promises are successfully resolved. The List @@ -157,6 +167,7 @@ public class Promises { * Promises are resolved with a failure. The failure * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. + * @see #all(Deferred, Promise...) */ @SafeVarargs public static <T> Promise<List<T>> all(Promise<? extends T>... promises) { @@ -191,6 +202,7 @@ public class Promises { * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. * @since 1.1 + * @see PromiseExecutors#deferred() */ @SafeVarargs public static <T> Promise<List<T>> all(Deferred<List<T>> deferred, @@ -229,7 +241,7 @@ public class Promises { List<Promise<?>> failed = new ArrayList<>(promises.size()); Throwable cause = null; for (Promise<? extends T> promise : promises) { - Result<T> result = Result.collect(promise); + Result<T> result = PromiseImpl.collect(promise); if (result.fail != null) { failed.add(promise); if (cause == null) { |