Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java247
1 files changed, 156 insertions, 91 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
index 2afb3771f..1a65fc8ef 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
package org.osgi.util.promise;
+import static java.util.Objects.requireNonNull;
+
import java.lang.reflect.InvocationTargetException;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
@@ -23,11 +25,13 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
@@ -42,18 +46,26 @@ import org.osgi.util.function.Predicate;
/**
* Promise implementation.
- *
* <p>
* This class is not used directly by clients. Clients should use
* {@link Deferred} to create a resolvable {@link Promise}.
*
* @param <T> The result type associated with the Promise.
- *
* @ThreadSafe
* @author $Id$
*/
final class PromiseImpl<T> implements Promise<T> {
/**
+ * The executor to use for callbacks. If {@code null}, the default callback
+ * executor is used.
+ */
+ private final Executor callbackExecutor;
+ /**
+ * The executor to use for scheduled operations. If {@code null}, the
+ * default scheduled executor is used.
+ */
+ private final ScheduledExecutorService scheduledExecutor;
+ /**
* A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no
* additional synchronization is required to write to or read from the
* queue.
@@ -91,8 +103,17 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* Initialize this Promise.
+ *
+ * @param callbackExecutor The executor to use for callbacks. {@code null}
+ * can be specified for the default callback executor.
+ * @param scheduledExecutor The scheduled executor for use for scheduled
+ * operations. {@code null} can be specified for the default
+ * scheduled executor.
*/
- PromiseImpl() {
+ PromiseImpl(Executor callbackExecutor,
+ ScheduledExecutorService scheduledExecutor) {
+ this.callbackExecutor = callbackExecutor;
+ this.scheduledExecutor = scheduledExecutor;
callbacks = new ConcurrentLinkedQueue<>();
resolved = new CountDownLatch(1);
}
@@ -102,13 +123,21 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @param v The value of this resolved Promise.
* @param f The failure of this resolved Promise.
+ * @param callbackExecutor The executor to use for callbacks. {@code null}
+ * can be specified for the default callback executor.
+ * @param scheduledExecutor The scheduled executor for use for scheduled
+ * operations. {@code null} can be specified for the default
+ * scheduled executor.
*/
- PromiseImpl(T v, Throwable f) {
+ PromiseImpl(T v, Throwable f, Executor callbackExecutor,
+ ScheduledExecutorService scheduledExecutor) {
if (f == null) {
value = v;
} else {
fail = f;
}
+ this.callbackExecutor = callbackExecutor;
+ this.scheduledExecutor = scheduledExecutor;
callbacks = new ConcurrentLinkedQueue<>();
resolved = new CountDownLatch(0);
}
@@ -164,24 +193,6 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
- * Call any registered callbacks if this Promise is resolved.
- */
- private void notifyCallbacks() {
- if (resolved.getCount() != 0) {
- return; // return if not resolved
- }
-
- /*
- * Note: multiple threads can be in this method removing callbacks from
- * the queue and executing them, so the order in which callbacks are
- * executed cannot be specified.
- */
- for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
- Callbacks.execute(callback);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -245,17 +256,65 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
+ * Call any registered callbacks if this Promise is resolved.
+ */
+ private void notifyCallbacks() {
+ if (resolved.getCount() != 0) {
+ return; // return if not resolved
+ }
+
+ /*
+ * Note: multiple threads can be in this method removing callbacks from
+ * the queue and executing them, so the order in which callbacks are
+ * executed cannot be specified.
+ */
+ Executor executor = callbackExecutor;
+ for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
+ if (executor == null) {
+ executor = DefaultExecutors.callbackExecutor();
+ }
+ try {
+ try {
+ executor.execute(callback);
+ } catch (RejectedExecutionException e) {
+ callback.run();
+ }
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ }
+ }
+
+ /**
+ * Handle an uncaught exception from a Runnable.
+ *
+ * @param t The uncaught exception.
+ * @since 1.1
+ */
+ static void uncaughtException(Throwable t) {
+ try {
+ Thread thread = Thread.currentThread();
+ thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
+ } catch (Throwable ignored) {
+ // we ignore this
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
- PromiseImpl<R> chained = new PromiseImpl<>();
+ PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new Then<>(this, success, failure));
return chained;
}
/**
* {@inheritDoc}
+ *
+ * @since 1.1
*/
@Override
public <R> Promise<R> then(Success<? super T, ? extends R> success) {
@@ -263,6 +322,17 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public Promise<T> then(Callback callback) {
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
+ onResolve(chained.new Chain(this, callback));
+ return chained;
+ }
+
+ /**
* A callback used to chain promises for the {@link #then(Success, Failure)}
* method.
*
@@ -357,16 +427,6 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
- * {@inheritDoc}
- */
- @Override
- public Promise<T> then(Callback callback) {
- PromiseImpl<T> chained = new PromiseImpl<>();
- onResolve(chained.new Chain(this, callback));
- return chained;
- }
-
- /**
* Resolve this Promise with the specified Promise.
*
* <p>
@@ -386,7 +446,8 @@ final class PromiseImpl<T> implements Promise<T> {
* resolved.
*/
Promise<Void> resolveWith(Promise<? extends T> with) {
- PromiseImpl<Void> chained = new PromiseImpl<>();
+ PromiseImpl<Void> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
with.onResolve(new ResolveWith(with, chained));
return chained;
}
@@ -424,7 +485,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> filter(Predicate<? super T> predicate) {
- PromiseImpl<T> chained = new PromiseImpl<>();
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new Filter(this, predicate));
return chained;
}
@@ -464,7 +526,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
- PromiseImpl<R> chained = new PromiseImpl<>();
+ PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new Map<>(this, mapper));
return chained;
}
@@ -504,7 +567,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
- PromiseImpl<R> chained = new PromiseImpl<>();
+ PromiseImpl<R> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new FlatMap<>(this, mapper));
return chained;
}
@@ -548,7 +612,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<>();
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new Recover(this, recovery));
return chained;
}
@@ -591,7 +656,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<>();
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new RecoverWith(this, recovery));
return chained;
}
@@ -635,7 +701,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> fallbackTo(Promise<? extends T> fallback) {
- PromiseImpl<T> chained = new PromiseImpl<>();
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(chained.new FallbackTo(this, fallback));
return chained;
}
@@ -666,13 +733,37 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
+ * Schedule a operation on the scheduled executor.
+ *
+ * @since 1.1
+ */
+ ScheduledFuture< ? > schedule(Runnable operation, long delay,
+ TimeUnit unit) {
+ ScheduledExecutorService executor = scheduledExecutor;
+ if (executor == null) {
+ executor = DefaultExecutors.scheduledExecutor();
+ }
+ try {
+ try {
+ return executor.schedule(operation, delay, unit);
+ } catch (RejectedExecutionException e) {
+ operation.run();
+ }
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ return null;
+ }
+
+ /**
* {@inheritDoc}
*
* @since 1.1
*/
@Override
public Promise<T> timeout(long millis) {
- PromiseImpl<T> chained = new PromiseImpl<>();
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
if (!isDone()) {
onResolve(chained.new Timeout(millis, TimeUnit.MILLISECONDS));
}
@@ -691,7 +782,7 @@ final class PromiseImpl<T> implements Promise<T> {
private final ScheduledFuture< ? > future;
Timeout(long timeout, TimeUnit unit) {
- future = Callbacks.schedule(new TimeoutAction(), timeout, unit);
+ future = schedule(new TimeoutAction(), timeout, unit);
}
@Override
@@ -701,15 +792,16 @@ final class PromiseImpl<T> implements Promise<T> {
}
}
}
-
+
/**
* Callback used to fail the Promise if the timeout expires.
*
* @Immutable
+ * @since 1.1
*/
private final class TimeoutAction implements Runnable {
TimeoutAction() {}
-
+
@Override
public void run() {
tryResolve(null, new TimeoutException());
@@ -723,7 +815,8 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> delay(long millis) {
- PromiseImpl<T> chained = new PromiseImpl<>();
+ PromiseImpl<T> chained = new PromiseImpl<>(callbackExecutor,
+ scheduledExecutor);
onResolve(new Delay(chained.new Chain(this), millis,
TimeUnit.MILLISECONDS));
return chained;
@@ -736,76 +829,54 @@ final class PromiseImpl<T> implements Promise<T> {
* @Immutable
* @since 1.1
*/
- private static final class Delay implements Runnable {
- private final Runnable callback;
+ private final class Delay implements Runnable {
+ private final Runnable operation;
private final long delay;
private final TimeUnit unit;
- Delay(Runnable callback, long delay, TimeUnit unit) {
- this.callback = callback;
+ Delay(Runnable operation, long delay, TimeUnit unit) {
+ this.operation = operation;
this.delay = delay;
this.unit = unit;
}
@Override
public void run() {
- Callbacks.schedule(callback, delay, unit);
+ schedule(operation, delay, unit);
}
}
/**
- * Callback handler used to asynchronously execute callbacks.
+ * Default executors for callbacks.
*
* @Immutable
* @since 1.1
*/
- private static final class Callbacks
+ private static final class DefaultExecutors
implements ThreadFactory, RejectedExecutionHandler, Runnable {
- private static final Callbacks callbacks;
+ private static final DefaultExecutors callbacks;
private static final ScheduledExecutor scheduledExecutor;
private static final ThreadPoolExecutor callbackExecutor;
static {
- callbacks = new Callbacks();
+ callbacks = new DefaultExecutors();
scheduledExecutor = new ScheduledExecutor(2, callbacks);
callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
callbacks, callbacks);
}
- /**
- * Schedule a callback on the scheduled executor
- */
- static ScheduledFuture< ? > schedule(Runnable callback, long delay,
- TimeUnit unit) {
- try {
- return scheduledExecutor.schedule(callback, delay, unit);
- } catch (RejectedExecutionException e) {
- callbacks.rejectedExecution(callback, scheduledExecutor);
- return null;
- }
+ static Executor callbackExecutor() {
+ return callbackExecutor;
}
- /**
- * Execute a callback on the callback executor
- */
- static void execute(Runnable callback) {
- callbackExecutor.execute(callback);
- }
-
- static void uncaughtException(Throwable t) {
- try {
- Thread thread = Thread.currentThread();
- thread.getUncaughtExceptionHandler().uncaughtException(thread,
- t);
- } catch (Throwable ignored) {
- // we ignore this
- }
+ static ScheduledExecutorService scheduledExecutor() {
+ return scheduledExecutor;
}
private final AtomicBoolean shutdownHookInstalled;
private final ThreadFactory delegateThreadFactory;
- private Callbacks() {
+ private DefaultExecutors() {
shutdownHookInstalled = new AtomicBoolean();
delegateThreadFactory = Executors.defaultThreadFactory();
}
@@ -929,13 +1000,14 @@ final class PromiseImpl<T> implements Promise<T> {
* A holder of the result of a promise.
*
* @NotThreadSafe
+ * @since 1.1
*/
static final class Result<P> {
Throwable fail;
P value;
-
+
Result() {}
-
+
static <R> Result<R> collect(Promise< ? extends R> promise) {
Result<R> result = new Result<>();
final boolean interrupted = Thread.interrupted();
@@ -954,11 +1026,4 @@ final class PromiseImpl<T> implements Promise<T> {
return result;
}
}
-
- static <V> V requireNonNull(V value) {
- if (value != null) {
- return value;
- }
- throw new NullPointerException();
- }
}

Back to the top