Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java147
1 files changed, 63 insertions, 84 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
index 71ecd591b..d78ae3b66 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
@@ -56,7 +56,7 @@ abstract class PromiseImpl<T> implements Promise<T> {
* operations.
*/
PromiseImpl(PromiseFactory factory) {
- this.factory = factory;
+ this.factory = requireNonNull(factory);
callbacks = new ConcurrentLinkedQueue<>();
}
@@ -72,6 +72,30 @@ abstract class PromiseImpl<T> implements Promise<T> {
}
/**
+ * Return a new {@link ResolvedPromiseImpl} using the {@link PromiseFactory}
+ * of this PromiseImpl.
+ *
+ * @param v Value for the ResolvedPromiseImpl.
+ * @return A new ResolvedPromiseImpl.
+ * @since 1.1
+ */
+ <V> ResolvedPromiseImpl<V> resolved(V v) {
+ return new ResolvedPromiseImpl<>(v, factory);
+ }
+
+ /**
+ * Return a new {@link FailedPromiseImpl} using the {@link PromiseFactory}
+ * of this PromiseImpl.
+ *
+ * @param f Failure for the FailedPromiseImpl.
+ * @return A new FailedPromiseImpl.
+ * @since 1.1
+ */
+ <V> FailedPromiseImpl<V> failed(Throwable f) {
+ return new FailedPromiseImpl<>(f, factory);
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -142,9 +166,25 @@ abstract class PromiseImpl<T> implements Promise<T> {
}
/**
- * {@inheritDoc}
+ * Run the specified chain when the specified promise is resolved.
*
- * @since 1.1
+ * @param promise The promise associated with the chain.
+ * @param chain The chain to run when the promise is resolved.
+ */
+ static <V> void chain(Promise<V> promise, Runnable chain) {
+ if (promise.isDone()) {
+ try {
+ chain.run();
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ } else {
+ promise.onResolve(chain);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
*/
@Override
public Promise<T> onSuccess(Consumer< ? super T> success) {
@@ -179,8 +219,6 @@ abstract class PromiseImpl<T> implements Promise<T> {
/**
* {@inheritDoc}
- *
- * @since 1.1
*/
@Override
public Promise<T> onFailure(Consumer< ? super Throwable> failure) {
@@ -219,8 +257,8 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
DeferredPromiseImpl<R> chained = deferred();
- onResolve(chained.new Then<>(this, success, failure));
- return chained;
+ chain(this, chained.new Then<>(this, success, failure));
+ return chained.orDone();
}
/**
@@ -233,14 +271,12 @@ abstract class PromiseImpl<T> implements Promise<T> {
/**
* {@inheritDoc}
- *
- * @since 1.1
*/
@Override
public Promise<T> thenAccept(Consumer< ? super T> consumer) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(chained.new ThenAccept(this, consumer));
- return chained;
+ chain(this, chained.new ThenAccept(this, consumer));
+ return chained.orDone();
}
@@ -250,8 +286,8 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public Promise<T> filter(Predicate<? super T> predicate) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(chained.new Filter(this, predicate));
- return chained;
+ chain(this, chained.new Filter(this, predicate));
+ return chained.orDone();
}
/**
@@ -260,8 +296,8 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
DeferredPromiseImpl<R> chained = deferred();
- onResolve(chained.new Map<>(this, mapper));
- return chained;
+ chain(this, chained.new Map<>(this, mapper));
+ return chained.orDone();
}
@@ -271,8 +307,8 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
DeferredPromiseImpl<R> chained = deferred();
- onResolve(chained.new FlatMap<>(this, mapper));
- return chained;
+ chain(this, chained.new FlatMap<>(this, mapper));
+ return chained.orDone();
}
/**
@@ -281,8 +317,8 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(chained.new Recover(this, recovery));
- return chained;
+ chain(this, chained.new Recover(this, recovery));
+ return chained.orDone();
}
/**
@@ -291,8 +327,8 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(chained.new RecoverWith(this, recovery));
- return chained;
+ chain(this, chained.new RecoverWith(this, recovery));
+ return chained.orDone();
}
/**
@@ -301,85 +337,28 @@ abstract class PromiseImpl<T> implements Promise<T> {
@Override
public Promise<T> fallbackTo(Promise<? extends T> fallback) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(chained.new FallbackTo(this, fallback));
- return chained;
+ chain(this, chained.new FallbackTo(this, fallback));
+ return chained.orDone();
}
/**
* {@inheritDoc}
- *
- * @since 1.1
*/
@Override
public Promise<T> timeout(long millis) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(chained.new ChainImpl(this));
- if (!isDone()) {
- PromiseImpl<T> timedout = new FailedPromiseImpl<>(
- new TimeoutException(), factory);
- onResolve(new Timeout(chained.new ChainImpl(timedout), millis,
- TimeUnit.MILLISECONDS));
- }
- return chained;
- }
-
- /**
- * Timeout class used by the {@link PromiseImpl#timeout(long)} method to
- * cancel timeout when the Promise is resolved.
- *
- * @Immutable
- * @since 1.1
- */
- private final class Timeout implements Runnable {
- private final ScheduledFuture< ? > future;
-
- Timeout(Runnable operation, long delay, TimeUnit unit) {
- future = schedule(operation, delay, unit);
- }
-
- @Override
- public void run() {
- if (future != null) {
- future.cancel(false);
- }
- }
+ chain(this, chained.new Timeout(this, millis));
+ return chained.orDone();
}
/**
* {@inheritDoc}
- *
- * @since 1.1
*/
@Override
public Promise<T> delay(long millis) {
DeferredPromiseImpl<T> chained = deferred();
- onResolve(new Delay(chained.new ChainImpl(this), millis,
- TimeUnit.MILLISECONDS));
- return chained;
- }
-
- /**
- * Delay class used by the {@link PromiseImpl#delay(long)} method to delay
- * chaining a promise.
- *
- * @Immutable
- * @since 1.1
- */
- private final class Delay implements Runnable {
- private final Runnable operation;
- private final long delay;
- private final TimeUnit unit;
-
- Delay(Runnable operation, long delay, TimeUnit unit) {
- this.operation = operation;
- this.delay = delay;
- this.unit = unit;
- }
-
- @Override
- public void run() {
- schedule(operation, delay, unit);
- }
+ chain(this, chained.new Delay(this, millis));
+ return chained.orDone();
}
/**

Back to the top