Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Watson2017-11-13 13:59:51 -0500
committerThomas Watson2017-11-13 14:04:00 -0500
commit7ac54c883ba1aa4dc3e0bdfa981f090fbc3bcb1c (patch)
tree43440d7c400a4f0331cb74c47803e2893d7fe270
parentaf9796c48cdef3e4f83fbd382fbb69b4441267c4 (diff)
downloadrt.equinox.framework-I20171114-2000.tar.gz
rt.equinox.framework-I20171114-2000.tar.xz
rt.equinox.framework-I20171114-2000.zip
Bug 527221 - [osgi R7] update log stream impl and APII20171115-0115I20171115-0025I20171114-2000I20171113-2000
Change-Id: Id5a0c165285c6b0bf496c948f26616161f5df4ab Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Consumer.java (renamed from bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java)12
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java40
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java113
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java383
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java611
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java44
6 files changed, 780 insertions, 423 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Consumer.java
index 17ff376bc..3636c6043 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Consumer.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,22 +19,24 @@ package org.osgi.util.function;
import org.osgi.annotation.versioning.ConsumerType;
/**
- * A callback that performs an operation and may throw an exception.
+ * A function that accepts a single argument and produces no result.
* <p>
* This is a functional interface and can be used as the assignment target for a
* lambda expression or method reference.
*
+ * @param <T> The type of the function input.
* @ThreadSafe
* @since 1.1
* @author $Id$
*/
@ConsumerType
@FunctionalInterface
-public interface Callback {
+public interface Consumer<T> {
/**
- * Execute the callback.
+ * Applies this function to the specified argument.
*
+ * @param t The input to this function.
* @throws Exception An exception thrown by the method.
*/
- void run() throws Exception;
+ void accept(T t) throws Exception;
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java
index 25c7442d3..e2c26cb80 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java
@@ -18,8 +18,7 @@ package org.osgi.util.promise;
import static java.util.Objects.requireNonNull;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
+import org.osgi.annotation.versioning.ProviderType;
/**
* A Deferred Promise resolution.
@@ -41,46 +40,35 @@ import java.util.concurrent.ScheduledExecutorService;
* @Immutable
* @author $Id$
*/
+@ProviderType
public class Deferred<T> {
- private final PromiseImpl<T> promise;
-
/**
- * Create a new Deferred.
- * <p>
- * The default callback executor and default scheduled executor will be
- * used.
+ * The Promise associated with this Deferred.
*/
- public Deferred() {
- this(null, null);
- }
+ private final PromiseImpl<T> promise;
/**
- * Create a new Deferred with the specified callback executor.
+ * Create a new Deferred.
* <p>
- * The default scheduled executor will be used.
+ * The {@link #getPromise() associated promise} will use the default
+ * callback executor and default scheduled executor.
*
- * @param callbackExecutor The executor to use for callbacks. {@code null}
- * can be specified for the default callback executor.
- * @since 1.1
+ * @see PromiseExecutors#deferred()
*/
- public Deferred(Executor callbackExecutor) {
- this(callbackExecutor, null);
+ public Deferred() {
+ this(PromiseExecutors.defaultExecutors);
}
/**
* Create a new Deferred with the specified callback and scheduled
* executors.
*
- * @param callbackExecutor The executor to use for callbacks. {@code null}
- * can be specified for the default callback executor.
- * @param scheduledExecutor The scheduled executor for use for scheduled
- * operations. {@code null} can be specified for the default
- * scheduled executor.
+ * @param executors The executors to use for callbacks and scheduled
+ * operations.
* @since 1.1
*/
- public Deferred(Executor callbackExecutor,
- ScheduledExecutorService scheduledExecutor) {
- promise = new PromiseImpl<>(callbackExecutor, scheduledExecutor);
+ Deferred(PromiseExecutors executors) {
+ promise = new PromiseImpl<>(executors);
}
/**
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java
index f9e4ef5d2..a2082aef3 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java
@@ -19,7 +19,7 @@ package org.osgi.util.promise;
import java.lang.reflect.InvocationTargetException;
import org.osgi.annotation.versioning.ProviderType;
-import org.osgi.util.function.Callback;
+import org.osgi.util.function.Consumer;
import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
@@ -115,41 +115,85 @@ public interface Promise<T> {
/**
* Register a callback to be called when this Promise is resolved.
- *
* <p>
* The specified callback is called when this Promise is resolved either
* successfully or with a failure.
- *
* <p>
* This method may be called at any time including before and after this
* Promise has been resolved.
+ * <p>
+ * Resolving this Promise <i>happens-before</i> any registered callback is
+ * called. That is, in a registered callback, {@link #isDone()} must return
+ * {@code true} and {@link #getValue()} and {@link #getFailure()} must not
+ * block.
+ * <p>
+ * A callback may be called on a different thread than the thread which
+ * registered the callback. So the callback must be thread safe but can rely
+ * upon that the registration of the callback <i>happens-before</i> the
+ * registered callback is called.
*
+ * @param callback The callback to be called when this Promise is resolved.
+ * Must not be {@code null}.
+ * @return This Promise.
+ */
+ Promise<T> onResolve(Runnable callback);
+
+ /**
+ * Register a callback to be called with the result of this Promise when
+ * this Promise is resolved successfully. The callback will not be called if
+ * this Promise is resolved with a failure.
+ * <p>
+ * This method may be called at any time including before and after this
+ * Promise has been resolved.
* <p>
* Resolving this Promise <i>happens-before</i> any registered callback is
* called. That is, in a registered callback, {@link #isDone()} must return
* {@code true} and {@link #getValue()} and {@link #getFailure()} must not
* block.
+ * <p>
+ * A callback may be called on a different thread than the thread which
+ * registered the callback. So the callback must be thread safe but can rely
+ * upon that the registration of the callback <i>happens-before</i> the
+ * registered callback is called.
*
+ * @param success The Consumer callback that receives the the value of this
+ * Promise. Must not be {@code null}.
+ * @return This Promise.
+ * @since 1.1
+ */
+ Promise<T> onSuccess(Consumer< ? super T> success);
+
+ /**
+ * Register a callback to be called with the failure for this Promise when
+ * this Promise is resolved with a failure. The callback will not be called
+ * if this Promise is resolved successfully.
+ * <p>
+ * This method may be called at any time including before and after this
+ * Promise has been resolved.
+ * <p>
+ * Resolving this Promise <i>happens-before</i> any registered callback is
+ * called. That is, in a registered callback, {@link #isDone()} must return
+ * {@code true} and {@link #getValue()} and {@link #getFailure()} must not
+ * block.
* <p>
* A callback may be called on a different thread than the thread which
* registered the callback. So the callback must be thread safe but can rely
* upon that the registration of the callback <i>happens-before</i> the
* registered callback is called.
*
- * @param callback A callback to be called when this Promise is resolved.
- * Must not be {@code null}.
+ * @param failure The Consumer callback that receives the the failure of
+ * this Promise. Must not be {@code null}.
* @return This Promise.
+ * @since 1.1
*/
- Promise<T> onResolve(Runnable callback);
+ Promise<T> onFailure(Consumer< ? super Throwable> failure);
/**
* Chain a new Promise to this Promise with Success and Failure callbacks.
- *
* <p>
* The specified {@link Success} callback is called when this Promise is
* successfully resolved and the specified {@link Failure} callback is
* called when this Promise is resolved with a failure.
- *
* <p>
* This method returns a new Promise which is chained to this Promise. The
* returned Promise must be resolved when this Promise is resolved after the
@@ -157,24 +201,20 @@ public interface Promise<T> {
* executed callback must be used to resolve the returned Promise. Multiple
* calls to this method can be used to create a chain of promises which are
* resolved in sequence.
- *
* <p>
* If this Promise is successfully resolved, the Success callback is
* executed and the result Promise, if any, or thrown exception is used to
* resolve the returned Promise from this method. If this Promise is
* resolved with a failure, the Failure callback is executed and the
* returned Promise from this method is failed.
- *
* <p>
* This method may be called at any time including before and after this
* Promise has been resolved.
- *
* <p>
* Resolving this Promise <i>happens-before</i> any registered callback is
* called. That is, in a registered callback, {@link #isDone()} must return
* {@code true} and {@link #getValue()} and {@link #getFailure()} must not
* block.
- *
* <p>
* A callback may be called on a different thread than the thread which
* registered the callback. So the callback must be thread safe but can rely
@@ -182,14 +222,14 @@ public interface Promise<T> {
* registered callback is called.
*
* @param <R> The value type associated with the returned Promise.
- * @param success A Success callback to be called when this Promise is
- * successfully resolved. May be {@code null} if no Success callback
- * is required. In this case, the returned Promise must be resolved
- * with the value {@code null} when this Promise is successfully
- * resolved.
- * @param failure A Failure callback to be called when this Promise is
- * resolved with a failure. May be {@code null} if no Failure
- * callback is required.
+ * @param success The Success callback to be called when this Promise is
+ * successfully resolved. May be {@code null} if no Success
+ * callback is required. In this case, the returned Promise must
+ * be resolved with the value {@code null} when this Promise is
+ * successfully resolved.
+ * @param failure The Failure callback to be called when this Promise is
+ * resolved with a failure. May be {@code null} if no Failure
+ * callback is required.
* @return A new Promise which is chained to this Promise. The returned
* Promise must be resolved when this Promise is resolved after the
* specified Success or Failure callback, if any, is executed.
@@ -198,18 +238,17 @@ public interface Promise<T> {
/**
* Chain a new Promise to this Promise with a Success callback.
- *
* <p>
* This method performs the same function as calling
* {@link #then(Success, Failure)} with the specified Success callback and
* {@code null} for the Failure callback.
*
* @param <R> The value type associated with the returned Promise.
- * @param success A Success callback to be called when this Promise is
- * successfully resolved. May be {@code null} if no Success callback
- * is required. In this case, the returned Promise must be resolved
- * with the value {@code null} when this Promise is successfully
- * resolved.
+ * @param success The Success callback to be called when this Promise is
+ * successfully resolved. May be {@code null} if no Success
+ * callback is required. In this case, the returned Promise must
+ * be resolved with the value {@code null} when this Promise is
+ * successfully resolved.
* @return A new Promise which is chained to this Promise. The returned
* Promise must be resolved when this Promise is resolved after the
* specified Success, if any, is executed.
@@ -218,16 +257,17 @@ public interface Promise<T> {
<R> Promise<R> then(Success<? super T, ? extends R> success);
/**
- * Chain a new Promise to this Promise with a callback.
+ * Chain a new Promise to this Promise with a Consumer callback that
+ * receives the value of this Promise when it is successfully resolved.
* <p>
- * The specified {@link Callback} is called when this Promise is resolved
- * either successfully or with a failure.
+ * The specified {@link Consumer} is called when this Promise is resolved
+ * successfully.
* <p>
* This method returns a new Promise which is chained to this Promise. The
* returned Promise must be resolved when this Promise is resolved after the
* specified callback is executed. If the callback throws an exception, the
* returned Promise is failed with that exception. Otherwise the returned
- * Promise is resolved with this Promise.
+ * Promise is resolved with the success value from this Promise.
* <p>
* This method may be called at any time including before and after this
* Promise has been resolved.
@@ -242,18 +282,17 @@ public interface Promise<T> {
* upon that the registration of the callback <i>happens-before</i> the
* registered callback is called.
*
- * @param callback A callback to be called when this Promise is resolved.
- * Must not be {@code null}.
+ * @param consumer The Consumer callback that receives the the value of this
+ * Promise. Must not be {@code null}.
* @return A new Promise which is chained to this Promise. The returned
* Promise must be resolved when this Promise is resolved after the
- * specified callback is executed.
+ * specified Consumer is executed.
* @since 1.1
*/
- Promise<T> then(Callback callback);
+ Promise<T> thenAccept(Consumer< ? super T> consumer);
/**
* Filter the value of this Promise.
- *
* <p>
* If this Promise is successfully resolved, the returned Promise must
* either be resolved with the value of this Promise, if the specified
@@ -261,17 +300,15 @@ public interface Promise<T> {
* {@code NoSuchElementException}, if the specified Predicate does not
* accept that value. If the specified Predicate throws an exception, the
* returned Promise must be failed with the exception.
- *
* <p>
* If this Promise is resolved with a failure, the returned Promise must be
* failed with that failure.
- *
* <p>
* This method may be called at any time including before and after this
* Promise has been resolved.
*
* @param predicate The Predicate to evaluate the value of this Promise.
- * Must not be {@code null}.
+ * Must not be {@code null}.
* @return A Promise that filters the value of this Promise.
*/
Promise<T> filter(Predicate<? super T> predicate);
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java
new file mode 100644
index 000000000..488ca7bac
--- /dev/null
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java
@@ -0,0 +1,383 @@
+/*
+ * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.promise;
+
+import static java.util.Objects.requireNonNull;
+import static org.osgi.util.promise.PromiseImpl.uncaughtException;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * The executors for Promise callbacks and scheduled operations.
+ * <p>
+ * Instances of this class can be used to create a Deferred that can be resolved
+ * in the future as well as resolved Promises. The returned Deferred and Promise
+ * objects all use the executors used to construct this object for any callback
+ * or scheduled operation execution.
+ *
+ * @Immutable
+ * @author $Id$
+ * @since 1.1
+ */
+@ConsumerType
+public class PromiseExecutors {
+ /**
+ * The default executors.
+ */
+ final static PromiseExecutors defaultExecutors = new PromiseExecutors(
+ null, null);
+
+ /**
+ * The executor to use for callbacks. If {@code null}, the default
+ * callback executor is used.
+ */
+ private final Executor callbackExecutor;
+ /**
+ * The executor to use for scheduled operations. If {@code null}, the
+ * default scheduled executor is used.
+ */
+ private final ScheduledExecutorService scheduledExecutor;
+
+
+ /**
+ * Create a new PromiseExecutors with the specified callback executor.
+ * <p>
+ * The default scheduled executor will be used.
+ *
+ * @param callbackExecutor The executor to use for callbacks. {@code null}
+ * can be specified for the default callback executor.
+ */
+ public PromiseExecutors(Executor callbackExecutor) {
+ this(callbackExecutor, null);
+ }
+
+ /**
+ * Create a new PromiseExecutors with the specified callback executor and
+ * specified scheduled executor.
+ *
+ * @param callbackExecutor The executor to use for callbacks. {@code null}
+ * can be specified for the default callback executor.
+ * @param scheduledExecutor The scheduled executor for use for scheduled
+ * operations. {@code null} can be specified for the default
+ * scheduled executor.
+ */
+ public PromiseExecutors(Executor callbackExecutor,
+ ScheduledExecutorService scheduledExecutor) {
+ this.callbackExecutor = callbackExecutor;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ /**
+ * Returns the executor to use for callbacks.
+ *
+ * @return The executor to use for callbacks. This will be the default
+ * callback executor if {@code null} was specified for the callback
+ * executor when this PromiseExecutors was created.
+ */
+ protected Executor executor() {
+ if (callbackExecutor == null) {
+ return DefaultExecutors.callbackExecutor();
+ }
+ return callbackExecutor;
+ }
+
+ /**
+ * Returns the executor to use for scheduled operations.
+ *
+ * @return The executor to use for scheduled operations. This will be the
+ * default scheduled executor if {@code null} was specified for the
+ * scheduled executor when this PromiseExecutors was created.
+ */
+ protected ScheduledExecutorService scheduledExecutor() {
+ if (scheduledExecutor == null) {
+ return DefaultExecutors.scheduledExecutor();
+ }
+ return scheduledExecutor;
+ }
+
+ /**
+ * Create a new Deferred with the callback executor and scheduled executor
+ * of this PromiseExecutors object.
+ * <p>
+ * Use this method instead of {@link Deferred#Deferred()} to create a new
+ * {@link Deferred} whose associated Promise uses executors other than the
+ * default executors.
+ *
+ * @return A new {@link Deferred} with the callback and scheduled executors
+ * of this PromiseExecutors object
+ */
+ public <T> Deferred<T> deferred() {
+ return new Deferred<>(this);
+ }
+
+ /**
+ * Returns a new Promise that has been resolved with the specified value.
+ * <p>
+ * The returned Promise uses the callback executor and scheduled executor of
+ * this PromiseExecutors object
+ * <p>
+ * Use this method instead of {@link Promises#resolved(Object)} to create a
+ * Promise which uses executors other than the default executors.
+ *
+ * @param <T> The value type associated with the returned Promise.
+ * @param value The value of the resolved Promise.
+ * @return A new Promise that has been resolved with the specified value.
+ */
+ public <T> Promise<T> resolved(T value) {
+ return new PromiseImpl<>(value, null, this);
+ }
+
+ /**
+ * Returns a new Promise that has been resolved with the specified failure.
+ * <p>
+ * The returned Promise uses the callback executor and scheduled executor of
+ * this PromiseExecutors object
+ * <p>
+ * Use this method instead of {@link Promises#failed(Throwable)} to create a
+ * Promise which uses executors other than the default executors.
+ *
+ * @param <T> The value type associated with the returned Promise.
+ * @param failure The failure of the resolved Promise. Must not be
+ * {@code null}.
+ * @return A new Promise that has been resolved with the specified failure.
+ */
+ public <T> Promise<T> failed(Throwable failure) {
+ return new PromiseImpl<>(null, requireNonNull(failure), this);
+ }
+
+ /**
+ * Returns a new Promise that will hold the result of the specified task.
+ * <p>
+ * The specified task will be executed on the {@link #executor() callback
+ * executor}.
+ *
+ * @param task The task whose result will be available from the returned
+ * Promise.
+ * @return A new Promise that will hold the result of the specified task.
+ */
+ public <V> Promise<V> submit(final Callable< ? extends V> task) {
+ requireNonNull(task);
+ final Deferred<V> deferred = deferred();
+ try {
+ executor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ deferred.resolve(task.call());
+ } catch (Throwable t) {
+ deferred.fail(t);
+ }
+ }
+ });
+ } catch (Exception t) {
+ deferred.fail(t);
+ }
+ return deferred.getPromise();
+ }
+
+ /**
+ * Returns an Executor implementation that executes tasks immediately on the
+ * thread calling the {@code Executor.execute} method.
+ *
+ * @return An Executor implementation that executes tasks immediately on the
+ * thread calling the {@code Executor.execute} method.
+ */
+ public static Executor inlineExecutor() {
+ return new InlineExecutor();
+ }
+
+ /**
+ * An Executor implementation which executes the task immediately on the
+ * thread calling the {@code Executor.execute} method.
+ *
+ * @Immutable
+ */
+ private static final class InlineExecutor implements Executor {
+ InlineExecutor() {}
+
+ @Override
+ public void execute(Runnable callback) {
+ callback.run();
+ }
+ }
+
+ /**
+ * Default executors for Promises.
+ *
+ * @Immutable
+ */
+ private static final class DefaultExecutors
+ implements ThreadFactory, RejectedExecutionHandler, Runnable {
+ private static final DefaultExecutors callbacks;
+ private static final ScheduledExecutor scheduledExecutor;
+ private static final ThreadPoolExecutor callbackExecutor;
+ static {
+ callbacks = new DefaultExecutors();
+ scheduledExecutor = new ScheduledExecutor(2, callbacks);
+ callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ callbacks, callbacks);
+ }
+
+ static Executor callbackExecutor() {
+ return callbackExecutor;
+ }
+
+ static ScheduledExecutorService scheduledExecutor() {
+ return scheduledExecutor;
+ }
+
+ private final AtomicBoolean shutdownHookInstalled;
+ private final ThreadFactory delegateThreadFactory;
+
+ private DefaultExecutors() {
+ shutdownHookInstalled = new AtomicBoolean();
+ delegateThreadFactory = Executors.defaultThreadFactory();
+ }
+
+ /**
+ * Executor threads should not prevent VM from exiting
+ */
+ @Override
+ public Thread newThread(Runnable r) {
+ if (shutdownHookInstalled.compareAndSet(false, true)) {
+ Thread shutdownThread = delegateThreadFactory.newThread(this);
+ shutdownThread.setName(
+ "ExecutorShutdownHook," + shutdownThread.getName());
+ try {
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+ } catch (IllegalStateException e) {
+ // VM is already shutting down...
+ callbackExecutor.shutdown();
+ scheduledExecutor.shutdown();
+ }
+ }
+ Thread t = delegateThreadFactory.newThread(r);
+ t.setName("PromiseImpl," + t.getName());
+ t.setDaemon(true);
+ return t;
+ }
+
+ /**
+ * Call the callback using the caller's thread because the thread pool
+ * rejected the execution.
+ */
+ @Override
+ public void rejectedExecution(Runnable callback,
+ ThreadPoolExecutor executor) {
+ try {
+ callback.run();
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ }
+
+ /**
+ * Shutdown hook
+ */
+ @Override
+ public void run() {
+ // limit new thread creation
+ callbackExecutor.setMaximumPoolSize(
+ Math.max(1, callbackExecutor.getPoolSize()));
+ // Run all delayed callbacks now
+ scheduledExecutor.shutdown();
+ BlockingQueue<Runnable> queue = scheduledExecutor.getQueue();
+ if (!queue.isEmpty()) {
+ for (Object r : queue.toArray()) {
+ if (r instanceof RunnableScheduledFuture< ? >) {
+ RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r;
+ if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L)
+ && queue.remove(future)) {
+ future.run();
+ scheduledExecutor.afterExecute(future, null);
+ }
+ }
+ }
+ scheduledExecutor.shutdown();
+ }
+ try {
+ scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Shutdown callback executor
+ callbackExecutor.shutdown();
+ try {
+ callbackExecutor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * ScheduledThreadPoolExecutor for scheduled execution.
+ *
+ * @ThreadSafe
+ */
+ private static final class ScheduledExecutor
+ extends ScheduledThreadPoolExecutor {
+ ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
+ super(corePoolSize, threadFactory);
+ }
+
+ /**
+ * Handle uncaught exceptions
+ */
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if ((t == null) && (r instanceof Future< ? >)) {
+ boolean interrupted = Thread.interrupted();
+ try {
+ ((Future< ? >) r).get();
+ } catch (CancellationException e) {
+ // ignore
+ } catch (InterruptedException e) {
+ interrupted = true;
+ } catch (ExecutionException e) {
+ t = e.getCause();
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (t != null) {
+ uncaughtException(t);
+ }
+ }
+ }
+ }
+}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
index 1a65fc8ef..5d02ff653 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
@@ -20,27 +20,13 @@ import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.util.NoSuchElementException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.RunnableScheduledFuture;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.osgi.util.function.Callback;
+import org.osgi.util.function.Consumer;
import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
@@ -56,15 +42,9 @@ import org.osgi.util.function.Predicate;
*/
final class PromiseImpl<T> implements Promise<T> {
/**
- * The executor to use for callbacks. If {@code null}, the default callback
- * executor is used.
+ * The executors to use for callbacks and scheduled operations.
*/
- private final Executor callbackExecutor;
- /**
- * The executor to use for scheduled operations. If {@code null}, the
- * default scheduled executor is used.
- */
- private final ScheduledExecutorService scheduledExecutor;
+ private final PromiseExecutors executors;
/**
* A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no
* additional synchronization is required to write to or read from the
@@ -104,16 +84,11 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* Initialize this Promise.
*
- * @param callbackExecutor The executor to use for callbacks. {@code null}
- * can be specified for the default callback executor.
- * @param scheduledExecutor The scheduled executor for use for scheduled
- * operations. {@code null} can be specified for the default
- * scheduled executor.
+ * @param executors The executors to use for callbacks and scheduled
+ * operations.
*/
- PromiseImpl(Executor callbackExecutor,
- ScheduledExecutorService scheduledExecutor) {
- this.callbackExecutor = callbackExecutor;
- this.scheduledExecutor = scheduledExecutor;
+ PromiseImpl(PromiseExecutors executors) {
+ this.executors = executors;
callbacks = new ConcurrentLinkedQueue<>();
resolved = new CountDownLatch(1);
}
@@ -123,21 +98,16 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @param v The value of this resolved Promise.
* @param f The failure of this resolved Promise.
- * @param callbackExecutor The executor to use for callbacks. {@code null}
- * can be specified for the default callback executor.
- * @param scheduledExecutor The scheduled executor for use for scheduled
- * operations. {@code null} can be specified for the default
- * scheduled executor.
+ * @param executors The executors to use for callbacks and scheduled
+ * operations.
*/
- PromiseImpl(T v, Throwable f, Executor callbackExecutor,
- ScheduledExecutorService scheduledExecutor) {
+ PromiseImpl(T v, Throwable f, PromiseExecutors executors) {
if (f == null) {
value = v;
} else {
fail = f;
}
- this.callbackExecutor = callbackExecutor;
- this.scheduledExecutor = scheduledExecutor;
+ this.executors = executors;
callbacks = new ConcurrentLinkedQueue<>();
resolved = new CountDownLatch(0);
}
@@ -226,23 +196,13 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public String toString() {
- if (!isDone()) {
+ if (!isDone()) { // ensure latch open before reading state
return super.toString() + "[unresolved]";
}
- final boolean interrupted = Thread.interrupted();
- try {
- Throwable t = getFailure();
- if (t != null) {
- return super.toString() + "[failed: " + t + "]";
- }
- return super.toString() + "[resolved: " + getValue() + "]";
- } catch (InterruptedException | InvocationTargetException e) {
- return super.toString() + "[" + e + "]";
- } finally {
- if (interrupted) { // restore interrupt status
- Thread.currentThread().interrupt();
- }
+ if (fail == null) {
+ return super.toString() + "[resolved: " + value + "]";
}
+ return super.toString() + "[failed: " + fail + "]";
}
/**
@@ -262,20 +222,15 @@ final class PromiseImpl<T> implements Promise<T> {
if (resolved.getCount() != 0) {
return; // return if not resolved
}
-
/*
* Note: multiple threads can be in this method removing callbacks from
* the queue and executing them, so the order in which callbacks are
* executed cannot be specified.
*/
- Executor executor = callbackExecutor;
for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
- if (executor == null) {
- executor = DefaultExecutors.callbackExecutor();
- }
try {
try {
- executor.execute(callback);
+ executors.executor().execute(callback);
} catch (RejectedExecutionException e) {
callback.run();
}
@@ -302,13 +257,38 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* {@inheritDoc}
+ *
+ * @since 1.1
*/
@Override
- public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
- PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
- onResolve(chained.new Then<>(this, success, failure));
- return chained;
+ public Promise<T> onSuccess(Consumer< ? super T> success) {
+ return onResolve(new OnSuccess(success));
+ }
+
+ /**
+ * A callback used for the {@link #onSuccess(Consumer)} method.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private final class OnSuccess implements Runnable {
+ private final Consumer< ? super T> success;
+
+ OnSuccess(Consumer< ? super T> success) {
+ this.success = requireNonNull(success);
+ }
+
+ @Override
+ public void run() {
+ Result<T> result = collect();
+ if (result.fail == null) {
+ try {
+ success.accept(result.value);
+ } catch (Throwable e) {
+ uncaughtException(e);
+ }
+ }
+ }
}
/**
@@ -317,34 +297,67 @@ final class PromiseImpl<T> implements Promise<T> {
* @since 1.1
*/
@Override
- public <R> Promise<R> then(Success<? super T, ? extends R> success) {
- return then(success, null);
+ public Promise<T> onFailure(Consumer< ? super Throwable> failure) {
+ return onResolve(new OnFailure(failure));
+ }
+
+ /**
+ * A callback used for the {@link #onFailure(Consumer)} method.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private final class OnFailure implements Runnable {
+ private final Consumer< ? super Throwable> failure;
+
+ OnFailure(Consumer< ? super Throwable> failure) {
+ this.failure = requireNonNull(failure);
+ }
+
+ @Override
+ public void run() {
+ Result<T> result = collect();
+ if (result.fail != null) {
+ try {
+ failure.accept(result.fail);
+ } catch (Throwable e) {
+ uncaughtException(e);
+ }
+ }
+ }
}
/**
* {@inheritDoc}
*/
@Override
- public Promise<T> then(Callback callback) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
- onResolve(chained.new Chain(this, callback));
+ public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
+ PromiseImpl<R> chained = new PromiseImpl<>(executors);
+ onResolve(chained.new Then<>(this, success, failure));
return chained;
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public <R> Promise<R> then(Success<? super T, ? extends R> success) {
+ return then(success, null);
+ }
+
+ /**
* A callback used to chain promises for the {@link #then(Success, Failure)}
* method.
*
* @Immutable
*/
private final class Then<P> implements Runnable {
- private final Promise<P> promise;
+ private final PromiseImpl<P> promise;
private final Success<P, ? extends T> success;
private final Failure failure;
@SuppressWarnings("unchecked")
- Then(Promise<P> promise, Success< ? super P, ? extends T> success,
+ Then(PromiseImpl<P> promise, Success< ? super P, ? extends T> success,
Failure failure) {
this.promise = promise;
this.success = (Success<P, ? extends T>) success;
@@ -353,13 +366,13 @@ final class PromiseImpl<T> implements Promise<T> {
@Override
public void run() {
- Throwable f = Result.collect(promise).fail;
- if (f != null) {
+ Result<P> result = promise.collect();
+ if (result.fail != null) {
if (failure != null) {
try {
failure.fail(promise);
} catch (Throwable e) {
- f = e; // propagate new exception
+ result.fail = e; // propagate new exception
}
}
} else if (success != null) {
@@ -367,7 +380,7 @@ final class PromiseImpl<T> implements Promise<T> {
try {
returned = success.call(promise);
} catch (Throwable e) {
- f = e; // propagate new exception
+ result.fail = e; // propagate new exception
}
if (returned != null) {
// resolve chained when returned promise is resolved
@@ -375,60 +388,95 @@ final class PromiseImpl<T> implements Promise<T> {
return;
}
}
- tryResolve(null, f);
+ tryResolve(null, result.fail);
}
}
/**
- * A callback used to resolve the chained Promise when the Promise promise
- * is resolved.
+ * A callback used to resolve the chained Promise when the Promise is
+ * resolved.
*
* @Immutable
*/
private final class Chain implements Runnable {
- private final Promise< ? extends T> promise;
- private final Throwable failure;
- private final Callback callback;
-
+ private final Promise< ? extends T> promise;
+
Chain(Promise< ? extends T> promise) {
this.promise = promise;
- this.failure = null;
- this.callback = null;
}
+
+ @Override
+ public void run() {
+ Result<T> result = collect(promise);
+ tryResolve(result.value, result.fail);
+ }
+ }
- Chain(Promise< ? extends T> promise, Throwable failure) {
+ /**
+ * A callback used to resolve the chained Promise when the PromiseImpl is
+ * resolved.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private final class ChainImpl implements Runnable {
+ private final PromiseImpl<T> promise;
+
+ ChainImpl(PromiseImpl<T> promise) {
this.promise = promise;
- this.failure = requireNonNull(failure);
- this.callback = null;
}
+
+ @Override
+ public void run() {
+ Result<T> result = promise.collect();
+ tryResolve(result.value, result.fail);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @since 1.1
+ */
+ @Override
+ public Promise<T> thenAccept(Consumer< ? super T> consumer) {
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ onResolve(chained.new ThenAccept(this, consumer));
+ return chained;
+ }
- Chain(Promise< ? extends T> promise, Callback callback) {
+ /**
+ * A callback used to resolve the chained Promise and call a Consumer when
+ * the PromiseImpl is resolved.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private final class ThenAccept implements Runnable {
+ private final PromiseImpl<T> promise;
+ private final Consumer< ? super T> consumer;
+
+ ThenAccept(PromiseImpl<T> promise, Consumer< ? super T> consumer) {
this.promise = promise;
- this.failure = null;
- this.callback = requireNonNull(callback);
+ this.consumer = requireNonNull(consumer);
}
-
+
@Override
public void run() {
- if (callback != null) {
+ Result<T> result = promise.collect();
+ if (result.fail == null) {
try {
- callback.run();
+ consumer.accept(result.value);
} catch (Throwable e) {
- tryResolve(null, e);
- return;
+ result.fail = e;
}
}
- Result<T> result = Result.collect(promise);
- if ((result.fail != null) && (failure != null)) {
- result.fail = failure;
- }
tryResolve(result.value, result.fail);
}
}
/**
* Resolve this Promise with the specified Promise.
- *
* <p>
* If the specified Promise is successfully resolved, this Promise is
* resolved with the value of the specified Promise. If the specified
@@ -436,7 +484,7 @@ final class PromiseImpl<T> implements Promise<T> {
* failure of the specified Promise.
*
* @param with A Promise whose value or failure must be used to resolve this
- * Promise. Must not be {@code null}.
+ * Promise. Must not be {@code null}.
* @return A Promise that is resolved only when this Promise is resolved by
* the specified Promise. The returned Promise must be successfully
* resolved with the value {@code null}, if this Promise was
@@ -446,37 +494,36 @@ final class PromiseImpl<T> implements Promise<T> {
* resolved.
*/
Promise<Void> resolveWith(Promise<? extends T> with) {
- PromiseImpl<Void> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
- with.onResolve(new ResolveWith(with, chained));
+ PromiseImpl<Void> chained = new PromiseImpl<>(executors);
+ with.onResolve(chained.new ResolveWith<>(this, with));
return chained;
}
/**
- * A callback used to resolve this Promise with another Promise for the
+ * A callback used to resolve a Promise with another Promise for the
* {@link PromiseImpl#resolveWith(Promise)} method.
*
* @Immutable
*/
- private final class ResolveWith implements Runnable {
- private final Promise< ? extends T> promise;
- private final PromiseImpl<Void> chained;
+ private final class ResolveWith<P> implements Runnable {
+ private final PromiseImpl<P> promise;
+ private final Promise< ? extends P> with;
- ResolveWith(Promise< ? extends T> promise, PromiseImpl<Void> chained) {
+ ResolveWith(PromiseImpl<P> promise, Promise< ? extends P> with) {
this.promise = promise;
- this.chained = chained;
+ this.with = requireNonNull(with);
}
@Override
public void run() {
Throwable f = null;
- Result<T> result = Result.collect(promise);
+ Result<P> result = collect(with);
try {
- resolve(result.value, result.fail);
+ promise.resolve(result.value, result.fail);
} catch (Throwable e) {
f = e; // propagate new exception
}
- chained.tryResolve(null, f);
+ tryResolve(null, f);
}
}
@@ -485,8 +532,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> filter(Predicate<? super T> predicate) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
onResolve(chained.new Filter(this, predicate));
return chained;
}
@@ -497,17 +543,17 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
*/
private final class Filter implements Runnable {
- private final Promise< ? extends T> promise;
+ private final PromiseImpl<T> promise;
private final Predicate<? super T> predicate;
- Filter(Promise< ? extends T> promise, Predicate< ? super T> predicate) {
+ Filter(PromiseImpl<T> promise, Predicate< ? super T> predicate) {
this.promise = promise;
this.predicate = requireNonNull(predicate);
}
@Override
public void run() {
- Result<T> result = Result.collect(promise);
+ Result<T> result = promise.collect();
if (result.fail == null) {
try {
if (!predicate.test(result.value)) {
@@ -526,8 +572,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
- PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<R> chained = new PromiseImpl<>(executors);
onResolve(chained.new Map<>(this, mapper));
return chained;
}
@@ -538,10 +583,10 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
*/
private final class Map<P> implements Runnable {
- private final Promise< ? extends P> promise;
+ private final PromiseImpl<P> promise;
private final Function<? super P, ? extends T> mapper;
- Map(Promise< ? extends P> promise,
+ Map(PromiseImpl<P> promise,
Function< ? super P, ? extends T> mapper) {
this.promise = promise;
this.mapper = requireNonNull(mapper);
@@ -549,7 +594,7 @@ final class PromiseImpl<T> implements Promise<T> {
@Override
public void run() {
- Result<P> result = Result.collect(promise);
+ Result<P> result = promise.collect();
T v = null;
if (result.fail == null) {
try {
@@ -567,8 +612,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
- PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<R> chained = new PromiseImpl<>(executors);
onResolve(chained.new FlatMap<>(this, mapper));
return chained;
}
@@ -579,10 +623,10 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
*/
private final class FlatMap<P> implements Runnable {
- private final Promise< ? extends P> promise;
+ private final PromiseImpl<P> promise;
private final Function< ? super P,Promise< ? extends T>> mapper;
- FlatMap(Promise< ? extends P> promise,
+ FlatMap(PromiseImpl<P> promise,
Function< ? super P,Promise< ? extends T>> mapper) {
this.promise = promise;
this.mapper = requireNonNull(mapper);
@@ -590,7 +634,7 @@ final class PromiseImpl<T> implements Promise<T> {
@Override
public void run() {
- Result<P> result = Result.collect(promise);
+ Result<P> result = promise.collect();
if (result.fail == null) {
Promise< ? extends T> flatmap = null;
try {
@@ -612,8 +656,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
onResolve(chained.new Recover(this, recovery));
return chained;
}
@@ -624,10 +667,10 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
*/
private final class Recover implements Runnable {
- private final Promise<T> promise;
+ private final PromiseImpl<T> promise;
private final Function<Promise< ? >, ? extends T> recovery;
- Recover(Promise<T> promise,
+ Recover(PromiseImpl<T> promise,
Function<Promise< ? >, ? extends T> recovery) {
this.promise = promise;
this.recovery = requireNonNull(recovery);
@@ -635,7 +678,7 @@ final class PromiseImpl<T> implements Promise<T> {
@Override
public void run() {
- Result<T> result = Result.collect(promise);
+ Result<T> result = promise.collect();
if (result.fail != null) {
try {
T v = recovery.apply(promise);
@@ -656,8 +699,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
onResolve(chained.new RecoverWith(this, recovery));
return chained;
}
@@ -668,10 +710,10 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
*/
private final class RecoverWith implements Runnable {
- private final Promise<T> promise;
+ private final PromiseImpl<T> promise;
private final Function<Promise<?>, Promise<? extends T>> recovery;
- RecoverWith(Promise<T> promise,
+ RecoverWith(PromiseImpl<T> promise,
Function<Promise< ? >,Promise< ? extends T>> recovery) {
this.promise = promise;
this.recovery = requireNonNull(recovery);
@@ -679,7 +721,7 @@ final class PromiseImpl<T> implements Promise<T> {
@Override
public void run() {
- Result<T> result = Result.collect(promise);
+ Result<T> result = promise.collect();
if (result.fail != null) {
Promise< ? extends T> recovered = null;
try {
@@ -701,8 +743,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> fallbackTo(Promise<? extends T> fallback) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
onResolve(chained.new FallbackTo(this, fallback));
return chained;
}
@@ -713,21 +754,47 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
*/
private final class FallbackTo implements Runnable {
- private final Promise<T> promise;
+ private final PromiseImpl<T> promise;
private final Promise<? extends T> fallback;
- FallbackTo(Promise<T> promise, Promise< ? extends T> fallback) {
+ FallbackTo(PromiseImpl<T> promise, Promise< ? extends T> fallback) {
this.promise = promise;
this.fallback = requireNonNull(fallback);
}
@Override
public void run() {
- Result<T> result = Result.collect(promise);
+ Result<T> result = promise.collect();
if (result.fail != null) {
- fallback.onResolve(new Chain(fallback, result.fail));
+ fallback.onResolve(new FallbackChain(fallback, result.fail));
return;
}
+ tryResolve(result.value, null);
+ }
+ }
+
+ /**
+ * A callback used to resolve the chained Promise when the fallback Promise
+ * is resolved.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private final class FallbackChain implements Runnable {
+ private final Promise< ? extends T> fallback;
+ private final Throwable failure;
+
+ FallbackChain(Promise< ? extends T> fallback, Throwable failure) {
+ this.fallback = fallback;
+ this.failure = failure;
+ }
+
+ @Override
+ public void run() {
+ Result<T> result = collect(fallback);
+ if (result.fail != null) {
+ result.fail = failure;
+ }
tryResolve(result.value, result.fail);
}
}
@@ -739,13 +806,10 @@ final class PromiseImpl<T> implements Promise<T> {
*/
ScheduledFuture< ? > schedule(Runnable operation, long delay,
TimeUnit unit) {
- ScheduledExecutorService executor = scheduledExecutor;
- if (executor == null) {
- executor = DefaultExecutors.scheduledExecutor();
- }
try {
try {
- return executor.schedule(operation, delay, unit);
+ return executors.scheduledExecutor().schedule(operation, delay,
+ unit);
} catch (RejectedExecutionException e) {
operation.run();
}
@@ -762,12 +826,14 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> timeout(long millis) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
if (!isDone()) {
- onResolve(chained.new Timeout(millis, TimeUnit.MILLISECONDS));
+ PromiseImpl<T> timedout = new PromiseImpl<>(null,
+ new TimeoutException(), executors);
+ onResolve(new Timeout(chained.new ChainImpl(timedout), millis,
+ TimeUnit.MILLISECONDS));
}
- onResolve(chained.new Chain(this));
+ onResolve(chained.new ChainImpl(this));
return chained;
}
@@ -781,8 +847,8 @@ final class PromiseImpl<T> implements Promise<T> {
private final class Timeout implements Runnable {
private final ScheduledFuture< ? > future;
- Timeout(long timeout, TimeUnit unit) {
- future = schedule(new TimeoutAction(), timeout, unit);
+ Timeout(Runnable operation, long delay, TimeUnit unit) {
+ future = schedule(operation, delay, unit);
}
@Override
@@ -794,30 +860,14 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
- * Callback used to fail the Promise if the timeout expires.
- *
- * @Immutable
- * @since 1.1
- */
- private final class TimeoutAction implements Runnable {
- TimeoutAction() {}
-
- @Override
- public void run() {
- tryResolve(null, new TimeoutException());
- }
- }
-
- /**
* {@inheritDoc}
*
* @since 1.1
*/
@Override
public Promise<T> delay(long millis) {
- PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
- scheduledExecutor);
- onResolve(new Delay(chained.new Chain(this), millis,
+ PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ onResolve(new Delay(chained.new ChainImpl(this), millis,
TimeUnit.MILLISECONDS));
return chained;
}
@@ -847,183 +897,68 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
- * Default executors for callbacks.
+ * A holder of the result of a Promise.
*
- * @Immutable
+ * @NotThreadSafe
* @since 1.1
*/
- private static final class DefaultExecutors
- implements ThreadFactory, RejectedExecutionHandler, Runnable {
- private static final DefaultExecutors callbacks;
- private static final ScheduledExecutor scheduledExecutor;
- private static final ThreadPoolExecutor callbackExecutor;
- static {
- callbacks = new DefaultExecutors();
- scheduledExecutor = new ScheduledExecutor(2, callbacks);
- callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
- TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- callbacks, callbacks);
- }
-
- static Executor callbackExecutor() {
- return callbackExecutor;
- }
-
- static ScheduledExecutorService scheduledExecutor() {
- return scheduledExecutor;
- }
-
- private final AtomicBoolean shutdownHookInstalled;
- private final ThreadFactory delegateThreadFactory;
-
- private DefaultExecutors() {
- shutdownHookInstalled = new AtomicBoolean();
- delegateThreadFactory = Executors.defaultThreadFactory();
- }
-
- /**
- * Executor threads should not prevent VM from exiting
- */
- @Override
- public Thread newThread(Runnable r) {
- if (shutdownHookInstalled.compareAndSet(false, true)) {
- Thread shutdownThread = delegateThreadFactory.newThread(this);
- shutdownThread.setName("ExecutorShutdownHook,"
- + shutdownThread.getName());
- try {
- Runtime.getRuntime().addShutdownHook(shutdownThread);
- } catch (IllegalStateException e) {
- // VM is already shutting down...
- callbackExecutor.shutdown();
- scheduledExecutor.shutdown();
- }
- }
- Thread t = delegateThreadFactory.newThread(r);
- t.setName("PromiseImpl," + t.getName());
- t.setDaemon(true);
- return t;
+ static final class Result<P> {
+ Throwable fail;
+ P value;
+
+ Result(P value) {
+ this.value = value;
+ this.fail = null;
}
-
- /**
- * Call the callback using the caller's thread because the thread pool
- * rejected the execution.
- */
- @Override
- public void rejectedExecution(Runnable callback,
- ThreadPoolExecutor executor) {
- try {
- callback.run();
- } catch (Throwable t) {
- uncaughtException(t);
- }
+
+ Result(Throwable fail) {
+ this.value = null;
+ this.fail = fail;
}
+ }
- /**
- * Shutdown hook
- */
- @Override
- public void run() {
- // limit new thread creation
- callbackExecutor.setMaximumPoolSize(
- Math.max(1, callbackExecutor.getPoolSize()));
- // Run all delayed callbacks now
- scheduledExecutor.shutdown();
- BlockingQueue<Runnable> queue = scheduledExecutor.getQueue();
- if (!queue.isEmpty()) {
- for (Object r : queue.toArray()) {
- if (r instanceof RunnableScheduledFuture< ? >) {
- RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r;
- if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L)
- && queue.remove(future)) {
- future.run();
- scheduledExecutor.afterExecute(future, null);
- }
- }
- }
- scheduledExecutor.shutdown();
- }
- try {
- scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- // Shutdown callback executor
- callbackExecutor.shutdown();
- try {
- callbackExecutor.awaitTermination(20, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ /**
+ * Return a holder of the result of this PromiseImpl.
+ *
+ * @since 1.1
+ */
+ Result<T> collect() {
+ if (!isDone()) { // ensure latch open before reading state
+ return new Result<T>(new AssertionError("promise not resolved"));
}
-
- /**
- * ScheduledThreadPoolExecutor for scheduled execution.
- *
- * @ThreadSafe
- */
- private static final class ScheduledExecutor
- extends ScheduledThreadPoolExecutor {
- ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
- super(corePoolSize, threadFactory);
- }
-
- /**
- * Handle uncaught exceptions
- */
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- if ((t == null) && (r instanceof Future< ? >)) {
- boolean interrupted = Thread.interrupted();
- try {
- ((Future< ? >) r).get();
- } catch (CancellationException e) {
- // ignore
- } catch (InterruptedException e) {
- interrupted = true;
- } catch (ExecutionException e) {
- t = e.getCause();
- } finally {
- if (interrupted) { // restore interrupt status
- Thread.currentThread().interrupt();
- }
- }
- }
- if (t != null) {
- uncaughtException(t);
- }
- }
+ if (fail == null) {
+ return new Result<T>(value);
}
+ return new Result<T>(fail);
}
/**
- * A holder of the result of a promise.
+ * Return a holder of the result of the specified Promise.
*
- * @NotThreadSafe
* @since 1.1
*/
- static final class Result<P> {
- Throwable fail;
- P value;
-
- Result() {}
-
- static <R> Result<R> collect(Promise< ? extends R> promise) {
- Result<R> result = new Result<>();
- final boolean interrupted = Thread.interrupted();
- try {
- result.fail = promise.getFailure();
- if (result.fail == null) {
- result.value = promise.getValue();
- }
- } catch (Throwable e) {
- result.fail = e; // propagate new exception
- } finally {
- if (interrupted) { // restore interrupt status
- Thread.currentThread().interrupt();
- }
+ static <R> Result<R> collect(Promise< ? extends R> promise) {
+ if (promise instanceof PromiseImpl) {
+ @SuppressWarnings("unchecked")
+ PromiseImpl<R> impl = (PromiseImpl<R>) promise;
+ return impl.collect();
+ }
+ if (!promise.isDone()) {
+ return new Result<R>(new AssertionError("promise not resolved"));
+ }
+ final boolean interrupted = Thread.interrupted();
+ try {
+ Throwable fail = promise.getFailure();
+ if (fail == null) {
+ return new Result<R>(promise.getValue());
+ }
+ return new Result<R>(fail);
+ } catch (Throwable e) {
+ return new Result<R>(e); // propagate new exception
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
}
- return result;
}
}
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java
index 394bce4bb..a25bdf3c5 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java
@@ -16,7 +16,7 @@
package org.osgi.util.promise;
-import static java.util.Objects.requireNonNull;
+import static org.osgi.util.promise.PromiseExecutors.defaultExecutors;
import java.util.ArrayList;
import java.util.Arrays;
@@ -42,10 +42,13 @@ public class Promises {
*
* @param <T> The value type associated with the returned Promise.
* @param value The value of the resolved Promise.
- * @return A new Promise that has been resolved with the specified value.
+ * @return A new Promise which uses the default callback executor and
+ * default scheduled executor that has been resolved with the
+ * specified value.
+ * @see PromiseExecutors#resolved(Object)
*/
public static <T> Promise<T> resolved(T value) {
- return new PromiseImpl<>(value, null, null, null);
+ return defaultExecutors.resolved(value);
}
/**
@@ -54,10 +57,13 @@ public class Promises {
* @param <T> The value type associated with the returned Promise.
* @param failure The failure of the resolved Promise. Must not be
* {@code null}.
- * @return A new Promise that has been resolved with the specified failure.
+ * @return A new Promise which uses the default callback executor and
+ * default scheduled executor that has been resolved with the
+ * specified failure.
+ * @see PromiseExecutors#failed(Throwable)
*/
public static <T> Promise<T> failed(Throwable failure) {
- return new PromiseImpl<>(null, requireNonNull(failure), null, null);
+ return defaultExecutors.failed(failure);
}
/**
@@ -74,19 +80,21 @@ public class Promises {
* @param promises The Promises which must be resolved before the returned
* Promise must be resolved. Must not be {@code null} and all of
* the elements in the collection must not be {@code null}.
- * @return A Promise that is resolved only when all the specified Promises
- * are resolved. The returned Promise must be successfully resolved
- * with a List of the values in the order of the specified Promises
- * if all the specified Promises are successfully resolved. The List
- * in the returned Promise is the property of the caller and is
- * modifiable. The returned Promise must be resolved with a failure
- * of {@link FailedPromisesException} if any of the specified
- * Promises are resolved with a failure. The failure
+ * @return A Promise which uses the default callback executor and default
+ * scheduled executor that is resolved only when all the specified
+ * Promises are resolved. The returned Promise must be successfully
+ * resolved with a List of the values in the order of the specified
+ * Promises if all the specified Promises are successfully resolved.
+ * The List in the returned Promise is the property of the caller
+ * and is modifiable. The returned Promise must be resolved with a
+ * failure of {@link FailedPromisesException} if any of the
+ * specified Promises are resolved with a failure. The failure
* {@link FailedPromisesException} must contain all of the specified
* Promises which resolved with a failure.
+ * @see #all(Deferred, Collection)
*/
public static <T, S extends T> Promise<List<T>> all(Collection<Promise<S>> promises) {
- return all(new Deferred<List<T>>(), promises);
+ return all(defaultExecutors.<List<T>> deferred(), promises);
}
/**
@@ -118,6 +126,7 @@ public class Promises {
* {@link FailedPromisesException} must contain all of the specified
* Promises which resolved with a failure.
* @since 1.1
+ * @see PromiseExecutors#deferred()
*/
public static <T, S extends T> Promise<List<T>> all(
Deferred<List<T>> deferred, Collection<Promise<S>> promises) {
@@ -147,7 +156,8 @@ public class Promises {
* @param promises The Promises which must be resolved before the returned
* Promise must be resolved. Must not be {@code null} and all of
* the arguments must not be {@code null}.
- * @return A Promise that is resolved only when all the specified Promises
+ * @return A Promise which uses the default callback executor and scheduled
+ * executor that is resolved only when all the specified Promises
* are resolved. The returned Promise must be successfully resolved
* with a List of the values in the order of the specified Promises
* if all the specified Promises are successfully resolved. The List
@@ -157,6 +167,7 @@ public class Promises {
* Promises are resolved with a failure. The failure
* {@link FailedPromisesException} must contain all of the specified
* Promises which resolved with a failure.
+ * @see #all(Deferred, Promise...)
*/
@SafeVarargs
public static <T> Promise<List<T>> all(Promise<? extends T>... promises) {
@@ -191,6 +202,7 @@ public class Promises {
* {@link FailedPromisesException} must contain all of the specified
* Promises which resolved with a failure.
* @since 1.1
+ * @see PromiseExecutors#deferred()
*/
@SafeVarargs
public static <T> Promise<List<T>> all(Deferred<List<T>> deferred,
@@ -229,7 +241,7 @@ public class Promises {
List<Promise<?>> failed = new ArrayList<>(promises.size());
Throwable cause = null;
for (Promise<? extends T> promise : promises) {
- Result<T> result = Result.collect(promise);
+ Result<T> result = PromiseImpl.collect(promise);
if (result.fail != null) {
failed.add(promise);
if (cause == null) {

Back to the top