diff options
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java | 147 |
1 files changed, 63 insertions, 84 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java index 71ecd591b..d78ae3b66 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java @@ -56,7 +56,7 @@ abstract class PromiseImpl<T> implements Promise<T> { * operations. */ PromiseImpl(PromiseFactory factory) { - this.factory = factory; + this.factory = requireNonNull(factory); callbacks = new ConcurrentLinkedQueue<>(); } @@ -72,6 +72,30 @@ abstract class PromiseImpl<T> implements Promise<T> { } /** + * Return a new {@link ResolvedPromiseImpl} using the {@link PromiseFactory} + * of this PromiseImpl. + * + * @param v Value for the ResolvedPromiseImpl. + * @return A new ResolvedPromiseImpl. + * @since 1.1 + */ + <V> ResolvedPromiseImpl<V> resolved(V v) { + return new ResolvedPromiseImpl<>(v, factory); + } + + /** + * Return a new {@link FailedPromiseImpl} using the {@link PromiseFactory} + * of this PromiseImpl. + * + * @param f Failure for the FailedPromiseImpl. + * @return A new FailedPromiseImpl. + * @since 1.1 + */ + <V> FailedPromiseImpl<V> failed(Throwable f) { + return new FailedPromiseImpl<>(f, factory); + } + + /** * {@inheritDoc} */ @Override @@ -142,9 +166,25 @@ abstract class PromiseImpl<T> implements Promise<T> { } /** - * {@inheritDoc} + * Run the specified chain when the specified promise is resolved. * - * @since 1.1 + * @param promise The promise associated with the chain. + * @param chain The chain to run when the promise is resolved. + */ + static <V> void chain(Promise<V> promise, Runnable chain) { + if (promise.isDone()) { + try { + chain.run(); + } catch (Throwable t) { + uncaughtException(t); + } + } else { + promise.onResolve(chain); + } + } + + /** + * {@inheritDoc} */ @Override public Promise<T> onSuccess(Consumer< ? super T> success) { @@ -179,8 +219,6 @@ abstract class PromiseImpl<T> implements Promise<T> { /** * {@inheritDoc} - * - * @since 1.1 */ @Override public Promise<T> onFailure(Consumer< ? super Throwable> failure) { @@ -219,8 +257,8 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { DeferredPromiseImpl<R> chained = deferred(); - onResolve(chained.new Then<>(this, success, failure)); - return chained; + chain(this, chained.new Then<>(this, success, failure)); + return chained.orDone(); } /** @@ -233,14 +271,12 @@ abstract class PromiseImpl<T> implements Promise<T> { /** * {@inheritDoc} - * - * @since 1.1 */ @Override public Promise<T> thenAccept(Consumer< ? super T> consumer) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(chained.new ThenAccept(this, consumer)); - return chained; + chain(this, chained.new ThenAccept(this, consumer)); + return chained.orDone(); } @@ -250,8 +286,8 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public Promise<T> filter(Predicate<? super T> predicate) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(chained.new Filter(this, predicate)); - return chained; + chain(this, chained.new Filter(this, predicate)); + return chained.orDone(); } /** @@ -260,8 +296,8 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { DeferredPromiseImpl<R> chained = deferred(); - onResolve(chained.new Map<>(this, mapper)); - return chained; + chain(this, chained.new Map<>(this, mapper)); + return chained.orDone(); } @@ -271,8 +307,8 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { DeferredPromiseImpl<R> chained = deferred(); - onResolve(chained.new FlatMap<>(this, mapper)); - return chained; + chain(this, chained.new FlatMap<>(this, mapper)); + return chained.orDone(); } /** @@ -281,8 +317,8 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(chained.new Recover(this, recovery)); - return chained; + chain(this, chained.new Recover(this, recovery)); + return chained.orDone(); } /** @@ -291,8 +327,8 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(chained.new RecoverWith(this, recovery)); - return chained; + chain(this, chained.new RecoverWith(this, recovery)); + return chained.orDone(); } /** @@ -301,85 +337,28 @@ abstract class PromiseImpl<T> implements Promise<T> { @Override public Promise<T> fallbackTo(Promise<? extends T> fallback) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(chained.new FallbackTo(this, fallback)); - return chained; + chain(this, chained.new FallbackTo(this, fallback)); + return chained.orDone(); } /** * {@inheritDoc} - * - * @since 1.1 */ @Override public Promise<T> timeout(long millis) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(chained.new ChainImpl(this)); - if (!isDone()) { - PromiseImpl<T> timedout = new FailedPromiseImpl<>( - new TimeoutException(), factory); - onResolve(new Timeout(chained.new ChainImpl(timedout), millis, - TimeUnit.MILLISECONDS)); - } - return chained; - } - - /** - * Timeout class used by the {@link PromiseImpl#timeout(long)} method to - * cancel timeout when the Promise is resolved. - * - * @Immutable - * @since 1.1 - */ - private final class Timeout implements Runnable { - private final ScheduledFuture< ? > future; - - Timeout(Runnable operation, long delay, TimeUnit unit) { - future = schedule(operation, delay, unit); - } - - @Override - public void run() { - if (future != null) { - future.cancel(false); - } - } + chain(this, chained.new Timeout(this, millis)); + return chained.orDone(); } /** * {@inheritDoc} - * - * @since 1.1 */ @Override public Promise<T> delay(long millis) { DeferredPromiseImpl<T> chained = deferred(); - onResolve(new Delay(chained.new ChainImpl(this), millis, - TimeUnit.MILLISECONDS)); - return chained; - } - - /** - * Delay class used by the {@link PromiseImpl#delay(long)} method to delay - * chaining a promise. - * - * @Immutable - * @since 1.1 - */ - private final class Delay implements Runnable { - private final Runnable operation; - private final long delay; - private final TimeUnit unit; - - Delay(Runnable operation, long delay, TimeUnit unit) { - this.operation = operation; - this.delay = delay; - this.unit = unit; - } - - @Override - public void run() { - schedule(operation, delay, unit); - } + chain(this, chained.new Delay(this, millis)); + return chained.orDone(); } /** |