Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java624
1 files changed, 51 insertions, 573 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
index 5d02ff653..71ecd591b 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
@@ -18,10 +18,7 @@ package org.osgi.util.promise;
import static java.util.Objects.requireNonNull;
-import java.lang.reflect.InvocationTargetException;
-import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -31,178 +28,47 @@ import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
/**
- * Promise implementation.
+ * Abstract Promise implementation.
* <p>
* This class is not used directly by clients. Clients should use
- * {@link Deferred} to create a resolvable {@link Promise}.
+ * {@link PromiseFactory} to create a {@link Promise}.
*
* @param <T> The result type associated with the Promise.
* @ThreadSafe
* @author $Id$
*/
-final class PromiseImpl<T> implements Promise<T> {
+abstract class PromiseImpl<T> implements Promise<T> {
/**
- * The executors to use for callbacks and scheduled operations.
+ * The factory to use for callbacks and scheduled operations.
*/
- private final PromiseExecutors executors;
+ private final PromiseFactory factory;
/**
* A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no
* additional synchronization is required to write to or read from the
* queue.
*/
private final ConcurrentLinkedQueue<Runnable> callbacks;
- /**
- * A CountDownLatch to manage the resolved state of this Promise.
- * <p>
- * This object is used as the synchronizing object to provide a critical
- * section in {@link #tryResolve(Object, Throwable)} so that only a single
- * thread can write the resolved state variables and open the latch.
- * <p>
- * The resolved state variables, {@link #value} and {@link #fail}, must only
- * be written when the latch is closed (getCount() != 0) and must only be
- * read when the latch is open (getCount() == 0). The latch state must
- * always be checked before writing or reading since the resolved state
- * variables' memory consistency is guarded by the latch.
- */
- private final CountDownLatch resolved;
- /**
- * The value of this Promise if successfully resolved.
- *
- * @GuardedBy("resolved")
- * @see #resolved
- */
- private T value;
- /**
- * The failure of this Promise if resolved with a failure or {@code null} if
- * successfully resolved.
- *
- * @GuardedBy("resolved")
- * @see #resolved
- */
- private Throwable fail;
/**
* Initialize this Promise.
*
- * @param executors The executors to use for callbacks and scheduled
- * operations.
- */
- PromiseImpl(PromiseExecutors executors) {
- this.executors = executors;
- callbacks = new ConcurrentLinkedQueue<>();
- resolved = new CountDownLatch(1);
- }
-
- /**
- * Initialize and resolve this Promise.
- *
- * @param v The value of this resolved Promise.
- * @param f The failure of this resolved Promise.
- * @param executors The executors to use for callbacks and scheduled
+ * @param factory The factory to use for callbacks and scheduled
* operations.
*/
- PromiseImpl(T v, Throwable f, PromiseExecutors executors) {
- if (f == null) {
- value = v;
- } else {
- fail = f;
- }
- this.executors = executors;
+ PromiseImpl(PromiseFactory factory) {
+ this.factory = factory;
callbacks = new ConcurrentLinkedQueue<>();
- resolved = new CountDownLatch(0);
- }
-
- /**
- * Try to resolve this Promise.
- * <p>
- * If this Promise was already resolved, return false. Otherwise, resolve
- * this Promise and return true.
- *
- * @param v The value of this Promise.
- * @param f The failure of this Promise.
- * @return false if this Promise was already resolved; true if this method
- * resolved this Promise.
- * @since 1.1
- */
- boolean tryResolve(T v, Throwable f) {
- // critical section: only one resolver at a time
- synchronized (resolved) {
- if (resolved.getCount() == 0) {
- return false;
- }
- /*
- * The resolved state variables must be set before opening the
- * latch. This safely publishes them to be read by other threads
- * that must verify the latch is open before reading.
- */
- if (f == null) {
- value = v;
- } else {
- fail = f;
- }
- resolved.countDown();
- }
- notifyCallbacks(); // call any registered callbacks
- return true;
}
/**
- * Resolve this Promise.
- * <p>
- * If this Promise was already resolved, throw IllegalStateException.
- * Otherwise, resolve this Promise.
+ * Return a new {@link DeferredPromiseImpl} using the {@link PromiseFactory}
+ * of this PromiseImpl.
*
- * @param v The value of this Promise.
- * @param f The failure of this Promise.
- * @throws IllegalStateException If this Promise was already resolved.
- */
- void resolve(T v, Throwable f) {
- if (!tryResolve(v, f)) {
- throw new IllegalStateException("Already resolved");
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isDone() {
- return resolved.getCount() == 0;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public T getValue() throws InvocationTargetException, InterruptedException {
- resolved.await();
- if (fail == null) {
- return value;
- }
- throw new InvocationTargetException(fail);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Throwable getFailure() throws InterruptedException {
- resolved.await();
- return fail;
- }
-
- /**
+ * @return A new DeferredPromiseImpl.
* @since 1.1
*/
- @Override
- public String toString() {
- if (!isDone()) { // ensure latch open before reading state
- return super.toString() + "[unresolved]";
- }
- if (fail == null) {
- return super.toString() + "[resolved: " + value + "]";
- }
- return super.toString() + "[failed: " + fail + "]";
+ <V> DeferredPromiseImpl<V> deferred() {
+ return new DeferredPromiseImpl<>(factory);
}
/**
@@ -218,8 +84,8 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* Call any registered callbacks if this Promise is resolved.
*/
- private void notifyCallbacks() {
- if (resolved.getCount() != 0) {
+ void notifyCallbacks() {
+ if (!isDone()) {
return; // return if not resolved
}
/*
@@ -230,7 +96,7 @@ final class PromiseImpl<T> implements Promise<T> {
for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
try {
try {
- executors.executor().execute(callback);
+ factory.executor().execute(callback);
} catch (RejectedExecutionException e) {
callback.run();
}
@@ -241,6 +107,26 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
+ * Schedule a operation on the scheduled executor.
+ *
+ * @since 1.1
+ */
+ ScheduledFuture< ? > schedule(Runnable operation, long delay,
+ TimeUnit unit) {
+ try {
+ try {
+ return factory.scheduledExecutor().schedule(operation, delay,
+ unit);
+ } catch (RejectedExecutionException e) {
+ operation.run();
+ }
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ return null;
+ }
+
+ /**
* Handle an uncaught exception from a Runnable.
*
* @param t The uncaught exception.
@@ -332,7 +218,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
- PromiseImpl<R> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<R> chained = deferred();
onResolve(chained.new Then<>(this, success, failure));
return chained;
}
@@ -346,494 +232,94 @@ final class PromiseImpl<T> implements Promise<T> {
}
/**
- * A callback used to chain promises for the {@link #then(Success, Failure)}
- * method.
- *
- * @Immutable
- */
- private final class Then<P> implements Runnable {
- private final PromiseImpl<P> promise;
- private final Success<P, ? extends T> success;
- private final Failure failure;
-
- @SuppressWarnings("unchecked")
- Then(PromiseImpl<P> promise, Success< ? super P, ? extends T> success,
- Failure failure) {
- this.promise = promise;
- this.success = (Success<P, ? extends T>) success;
- this.failure = failure;
- }
-
- @Override
- public void run() {
- Result<P> result = promise.collect();
- if (result.fail != null) {
- if (failure != null) {
- try {
- failure.fail(promise);
- } catch (Throwable e) {
- result.fail = e; // propagate new exception
- }
- }
- } else if (success != null) {
- Promise< ? extends T> returned = null;
- try {
- returned = success.call(promise);
- } catch (Throwable e) {
- result.fail = e; // propagate new exception
- }
- if (returned != null) {
- // resolve chained when returned promise is resolved
- returned.onResolve(new Chain(returned));
- return;
- }
- }
- tryResolve(null, result.fail);
- }
- }
-
- /**
- * A callback used to resolve the chained Promise when the Promise is
- * resolved.
- *
- * @Immutable
- */
- private final class Chain implements Runnable {
- private final Promise< ? extends T> promise;
-
- Chain(Promise< ? extends T> promise) {
- this.promise = promise;
- }
-
- @Override
- public void run() {
- Result<T> result = collect(promise);
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
- * A callback used to resolve the chained Promise when the PromiseImpl is
- * resolved.
- *
- * @Immutable
- * @since 1.1
- */
- private final class ChainImpl implements Runnable {
- private final PromiseImpl<T> promise;
-
- ChainImpl(PromiseImpl<T> promise) {
- this.promise = promise;
- }
-
- @Override
- public void run() {
- Result<T> result = promise.collect();
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
* {@inheritDoc}
*
* @since 1.1
*/
@Override
public Promise<T> thenAccept(Consumer< ? super T> consumer) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
onResolve(chained.new ThenAccept(this, consumer));
return chained;
}
- /**
- * A callback used to resolve the chained Promise and call a Consumer when
- * the PromiseImpl is resolved.
- *
- * @Immutable
- * @since 1.1
- */
- private final class ThenAccept implements Runnable {
- private final PromiseImpl<T> promise;
- private final Consumer< ? super T> consumer;
-
- ThenAccept(PromiseImpl<T> promise, Consumer< ? super T> consumer) {
- this.promise = promise;
- this.consumer = requireNonNull(consumer);
- }
-
- @Override
- public void run() {
- Result<T> result = promise.collect();
- if (result.fail == null) {
- try {
- consumer.accept(result.value);
- } catch (Throwable e) {
- result.fail = e;
- }
- }
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
- * Resolve this Promise with the specified Promise.
- * <p>
- * If the specified Promise is successfully resolved, this Promise is
- * resolved with the value of the specified Promise. If the specified
- * Promise is resolved with a failure, this Promise is resolved with the
- * failure of the specified Promise.
- *
- * @param with A Promise whose value or failure must be used to resolve this
- * Promise. Must not be {@code null}.
- * @return A Promise that is resolved only when this Promise is resolved by
- * the specified Promise. The returned Promise must be successfully
- * resolved with the value {@code null}, if this Promise was
- * resolved by the specified Promise. The returned Promise must be
- * resolved with a failure of {@link IllegalStateException}, if this
- * Promise was already resolved when the specified Promise was
- * resolved.
- */
- Promise<Void> resolveWith(Promise<? extends T> with) {
- PromiseImpl<Void> chained = new PromiseImpl<>(executors);
- with.onResolve(chained.new ResolveWith<>(this, with));
- return chained;
- }
-
- /**
- * A callback used to resolve a Promise with another Promise for the
- * {@link PromiseImpl#resolveWith(Promise)} method.
- *
- * @Immutable
- */
- private final class ResolveWith<P> implements Runnable {
- private final PromiseImpl<P> promise;
- private final Promise< ? extends P> with;
-
- ResolveWith(PromiseImpl<P> promise, Promise< ? extends P> with) {
- this.promise = promise;
- this.with = requireNonNull(with);
- }
-
- @Override
- public void run() {
- Throwable f = null;
- Result<P> result = collect(with);
- try {
- promise.resolve(result.value, result.fail);
- } catch (Throwable e) {
- f = e; // propagate new exception
- }
- tryResolve(null, f);
- }
- }
/**
* {@inheritDoc}
*/
@Override
public Promise<T> filter(Predicate<? super T> predicate) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
onResolve(chained.new Filter(this, predicate));
return chained;
}
/**
- * A callback used by the {@link PromiseImpl#filter(Predicate)} method.
- *
- * @Immutable
- */
- private final class Filter implements Runnable {
- private final PromiseImpl<T> promise;
- private final Predicate<? super T> predicate;
-
- Filter(PromiseImpl<T> promise, Predicate< ? super T> predicate) {
- this.promise = promise;
- this.predicate = requireNonNull(predicate);
- }
-
- @Override
- public void run() {
- Result<T> result = promise.collect();
- if (result.fail == null) {
- try {
- if (!predicate.test(result.value)) {
- result.fail = new NoSuchElementException();
- }
- } catch (Throwable e) { // propagate new exception
- result.fail = e;
- }
- }
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
- PromiseImpl<R> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<R> chained = deferred();
onResolve(chained.new Map<>(this, mapper));
return chained;
}
- /**
- * A callback used by the {@link PromiseImpl#map(Function)} method.
- *
- * @Immutable
- */
- private final class Map<P> implements Runnable {
- private final PromiseImpl<P> promise;
- private final Function<? super P, ? extends T> mapper;
-
- Map(PromiseImpl<P> promise,
- Function< ? super P, ? extends T> mapper) {
- this.promise = promise;
- this.mapper = requireNonNull(mapper);
- }
-
- @Override
- public void run() {
- Result<P> result = promise.collect();
- T v = null;
- if (result.fail == null) {
- try {
- v = mapper.apply(result.value);
- } catch (Throwable e) { // propagate new exception
- result.fail = e;
- }
- }
- tryResolve(v, result.fail);
- }
- }
/**
* {@inheritDoc}
*/
@Override
public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
- PromiseImpl<R> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<R> chained = deferred();
onResolve(chained.new FlatMap<>(this, mapper));
return chained;
}
/**
- * A callback used by the {@link PromiseImpl#flatMap(Function)} method.
- *
- * @Immutable
- */
- private final class FlatMap<P> implements Runnable {
- private final PromiseImpl<P> promise;
- private final Function< ? super P,Promise< ? extends T>> mapper;
-
- FlatMap(PromiseImpl<P> promise,
- Function< ? super P,Promise< ? extends T>> mapper) {
- this.promise = promise;
- this.mapper = requireNonNull(mapper);
- }
-
- @Override
- public void run() {
- Result<P> result = promise.collect();
- if (result.fail == null) {
- Promise< ? extends T> flatmap = null;
- try {
- flatmap = mapper.apply(result.value);
- } catch (Throwable e) { // propagate new exception
- result.fail = e;
- }
- if (flatmap != null) {
- flatmap.onResolve(new Chain(flatmap));
- return;
- }
- }
- tryResolve(null, result.fail);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
onResolve(chained.new Recover(this, recovery));
return chained;
}
/**
- * A callback used by the {@link PromiseImpl#recover(Function)} method.
- *
- * @Immutable
- */
- private final class Recover implements Runnable {
- private final PromiseImpl<T> promise;
- private final Function<Promise< ? >, ? extends T> recovery;
-
- Recover(PromiseImpl<T> promise,
- Function<Promise< ? >, ? extends T> recovery) {
- this.promise = promise;
- this.recovery = requireNonNull(recovery);
- }
-
- @Override
- public void run() {
- Result<T> result = promise.collect();
- if (result.fail != null) {
- try {
- T v = recovery.apply(promise);
- if (v != null) {
- result.value = v;
- result.fail = null;
- }
- } catch (Throwable e) { // propagate new exception
- result.fail = e;
- }
- }
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
onResolve(chained.new RecoverWith(this, recovery));
return chained;
}
/**
- * A callback used by the {@link PromiseImpl#recoverWith(Function)} method.
- *
- * @Immutable
- */
- private final class RecoverWith implements Runnable {
- private final PromiseImpl<T> promise;
- private final Function<Promise<?>, Promise<? extends T>> recovery;
-
- RecoverWith(PromiseImpl<T> promise,
- Function<Promise< ? >,Promise< ? extends T>> recovery) {
- this.promise = promise;
- this.recovery = requireNonNull(recovery);
- }
-
- @Override
- public void run() {
- Result<T> result = promise.collect();
- if (result.fail != null) {
- Promise< ? extends T> recovered = null;
- try {
- recovered = recovery.apply(promise);
- } catch (Throwable e) { // propagate new exception
- result.fail = e;
- }
- if (recovered != null) {
- recovered.onResolve(new Chain(recovered));
- return;
- }
- }
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
public Promise<T> fallbackTo(Promise<? extends T> fallback) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
onResolve(chained.new FallbackTo(this, fallback));
return chained;
}
/**
- * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method.
- *
- * @Immutable
- */
- private final class FallbackTo implements Runnable {
- private final PromiseImpl<T> promise;
- private final Promise<? extends T> fallback;
-
- FallbackTo(PromiseImpl<T> promise, Promise< ? extends T> fallback) {
- this.promise = promise;
- this.fallback = requireNonNull(fallback);
- }
-
- @Override
- public void run() {
- Result<T> result = promise.collect();
- if (result.fail != null) {
- fallback.onResolve(new FallbackChain(fallback, result.fail));
- return;
- }
- tryResolve(result.value, null);
- }
- }
-
- /**
- * A callback used to resolve the chained Promise when the fallback Promise
- * is resolved.
- *
- * @Immutable
- * @since 1.1
- */
- private final class FallbackChain implements Runnable {
- private final Promise< ? extends T> fallback;
- private final Throwable failure;
-
- FallbackChain(Promise< ? extends T> fallback, Throwable failure) {
- this.fallback = fallback;
- this.failure = failure;
- }
-
- @Override
- public void run() {
- Result<T> result = collect(fallback);
- if (result.fail != null) {
- result.fail = failure;
- }
- tryResolve(result.value, result.fail);
- }
- }
-
- /**
- * Schedule a operation on the scheduled executor.
- *
- * @since 1.1
- */
- ScheduledFuture< ? > schedule(Runnable operation, long delay,
- TimeUnit unit) {
- try {
- try {
- return executors.scheduledExecutor().schedule(operation, delay,
- unit);
- } catch (RejectedExecutionException e) {
- operation.run();
- }
- } catch (Throwable t) {
- uncaughtException(t);
- }
- return null;
- }
-
- /**
* {@inheritDoc}
*
* @since 1.1
*/
@Override
public Promise<T> timeout(long millis) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
+ onResolve(chained.new ChainImpl(this));
if (!isDone()) {
- PromiseImpl<T> timedout = new PromiseImpl<>(null,
- new TimeoutException(), executors);
+ PromiseImpl<T> timedout = new FailedPromiseImpl<>(
+ new TimeoutException(), factory);
onResolve(new Timeout(chained.new ChainImpl(timedout), millis,
TimeUnit.MILLISECONDS));
}
- onResolve(chained.new ChainImpl(this));
return chained;
}
@@ -866,7 +352,7 @@ final class PromiseImpl<T> implements Promise<T> {
*/
@Override
public Promise<T> delay(long millis) {
- PromiseImpl<T> chained = new PromiseImpl<>(executors);
+ DeferredPromiseImpl<T> chained = deferred();
onResolve(new Delay(chained.new ChainImpl(this), millis,
TimeUnit.MILLISECONDS));
return chained;
@@ -903,8 +389,8 @@ final class PromiseImpl<T> implements Promise<T> {
* @since 1.1
*/
static final class Result<P> {
- Throwable fail;
P value;
+ Throwable fail;
Result(P value) {
this.value = value;
@@ -922,15 +408,7 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @since 1.1
*/
- Result<T> collect() {
- if (!isDone()) { // ensure latch open before reading state
- return new Result<T>(new AssertionError("promise not resolved"));
- }
- if (fail == null) {
- return new Result<T>(value);
- }
- return new Result<T>(fail);
- }
+ abstract Result<T> collect();
/**
* Return a holder of the result of the specified Promise.

Back to the top