Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java615
1 files changed, 615 insertions, 0 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
new file mode 100644
index 000000000..ca2be5e80
--- /dev/null
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
@@ -0,0 +1,615 @@
+/*
+ * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.promise;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import org.osgi.util.function.Function;
+import org.osgi.util.function.Predicate;
+
+/**
+ * Promise implementation.
+ *
+ * <p>
+ * This class is not used directly by clients. Clients should use
+ * {@link Deferred} to create a resolvable {@link Promise}.
+ *
+ * @param <T> The result type associated with the Promise.
+ *
+ * @ThreadSafe
+ * @author $Id$
+ */
+final class PromiseImpl<T> implements Promise<T> {
+ /**
+ * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no
+ * additional synchronization is required to write to or read from the
+ * queue.
+ */
+ private final ConcurrentLinkedQueue<Runnable> callbacks;
+ /**
+ * A CountDownLatch to manage the resolved state of this Promise.
+ *
+ * <p>
+ * This object is used as the synchronizing object to provide a critical
+ * section in {@link #resolve(Object, Throwable)} so that only a single
+ * thread can write the resolved state variables and open the latch.
+ *
+ * <p>
+ * The resolved state variables, {@link #value} and {@link #fail}, must only
+ * be written when the latch is closed (getCount() != 0) and must only be
+ * read when the latch is open (getCount() == 0). The latch state must
+ * always be checked before writing or reading since the resolved state
+ * variables' memory consistency is guarded by the latch.
+ */
+ private final CountDownLatch resolved;
+ /**
+ * The value of this Promise if successfully resolved.
+ *
+ * @GuardedBy("resolved")
+ * @see #resolved
+ */
+ private T value;
+ /**
+ * The failure of this Promise if resolved with a failure or {@code null} if
+ * successfully resolved.
+ *
+ * @GuardedBy("resolved")
+ * @see #resolved
+ */
+ private Throwable fail;
+
+ /**
+ * Initialize this Promise.
+ */
+ PromiseImpl() {
+ callbacks = new ConcurrentLinkedQueue<Runnable>();
+ resolved = new CountDownLatch(1);
+ }
+
+ /**
+ * Initialize and resolve this Promise.
+ *
+ * @param v The value of this resolved Promise.
+ * @param f The failure of this resolved Promise.
+ */
+ PromiseImpl(T v, Throwable f) {
+ value = v;
+ fail = f;
+ callbacks = new ConcurrentLinkedQueue<Runnable>();
+ resolved = new CountDownLatch(0);
+ }
+
+ /**
+ * Resolve this Promise.
+ *
+ * @param v The value of this Promise.
+ * @param f The failure of this Promise.
+ */
+ void resolve(T v, Throwable f) {
+ // critical section: only one resolver at a time
+ synchronized (resolved) {
+ if (resolved.getCount() == 0) {
+ throw new IllegalStateException("Already resolved");
+ }
+ /*
+ * The resolved state variables must be set before opening the
+ * latch. This safely publishes them to be read by other threads
+ * that must verify the latch is open before reading.
+ */
+ value = v;
+ fail = f;
+ resolved.countDown();
+ }
+ notifyCallbacks(); // call any registered callbacks
+ }
+
+ /**
+ * Call any registered callbacks if this Promise is resolved.
+ */
+ private void notifyCallbacks() {
+ if (resolved.getCount() != 0) {
+ return; // return if not resolved
+ }
+
+ /*
+ * Note: multiple threads can be in this method removing callbacks from
+ * the queue and calling them, so the order in which callbacks are
+ * called cannot be specified.
+ */
+ for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
+ try {
+ callback.run();
+ } catch (Throwable t) {
+ Logger.logCallbackException(t);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isDone() {
+ return resolved.getCount() == 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public T getValue() throws InvocationTargetException, InterruptedException {
+ resolved.await();
+ if (fail == null) {
+ return value;
+ }
+ throw new InvocationTargetException(fail);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Throwable getFailure() throws InterruptedException {
+ resolved.await();
+ return fail;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Promise<T> onResolve(Runnable callback) {
+ callbacks.offer(callback);
+ notifyCallbacks(); // call any registered callbacks
+ return this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
+ PromiseImpl<R> chained = new PromiseImpl<R>();
+ onResolve(new Then<R>(chained, success, failure));
+ return chained;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public <R> Promise<R> then(Success<? super T, ? extends R> success) {
+ return then(success, null);
+ }
+
+ /**
+ * A callback used to chain promises for the {@link #then(Success, Failure)}
+ * method.
+ *
+ * @Immutable
+ */
+ private final class Then<R> implements Runnable {
+ private final PromiseImpl<R> chained;
+ private final Success<T, ? extends R> success;
+ private final Failure failure;
+
+ @SuppressWarnings("unchecked")
+ Then(PromiseImpl<R> chained, Success<? super T, ? extends R> success, Failure failure) {
+ this.chained = chained;
+ this.success = (Success<T, ? extends R>) success;
+ this.failure = failure;
+ }
+
+ public void run() {
+ Throwable f;
+ final boolean interrupted = Thread.interrupted();
+ try {
+ f = getFailure();
+ } catch (Throwable e) {
+ f = e; // propagate new exception
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (f != null) {
+ if (failure != null) {
+ try {
+ failure.fail(PromiseImpl.this);
+ } catch (Throwable e) {
+ f = e; // propagate new exception
+ }
+ }
+ // fail chained
+ chained.resolve(null, f);
+ return;
+ }
+ Promise<? extends R> returned = null;
+ if (success != null) {
+ try {
+ returned = success.call(PromiseImpl.this);
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return;
+ }
+ }
+ if (returned == null) {
+ // resolve chained with null value
+ chained.resolve(null, null);
+ } else {
+ // resolve chained when returned promise is resolved
+ returned.onResolve(new Chain<R>(chained, returned));
+ }
+ }
+ }
+
+ /**
+ * A callback used to resolve the chained Promise when the Promise promise
+ * is resolved.
+ *
+ * @Immutable
+ */
+ private final static class Chain<R> implements Runnable {
+ private final PromiseImpl<R> chained;
+ private final Promise<? extends R> promise;
+ private final Throwable failure;
+
+ Chain(PromiseImpl<R> chained, Promise<? extends R> promise) {
+ this.chained = chained;
+ this.promise = promise;
+ this.failure = null;
+ }
+
+ Chain(PromiseImpl<R> chained, Promise<? extends R> promise, Throwable failure) {
+ this.chained = chained;
+ this.promise = promise;
+ this.failure = failure;
+ }
+
+ public void run() {
+ R value = null;
+ Throwable f;
+ final boolean interrupted = Thread.interrupted();
+ try {
+ f = promise.getFailure();
+ if (f == null) {
+ value = promise.getValue();
+ } else if (failure != null) {
+ f = failure;
+ }
+ } catch (Throwable e) {
+ f = e; // propagate new exception
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ chained.resolve(value, f);
+ }
+ }
+
+ /**
+ * Resolve this Promise with the specified Promise.
+ *
+ * <p>
+ * If the specified Promise is successfully resolved, this Promise is
+ * resolved with the value of the specified Promise. If the specified
+ * Promise is resolved with a failure, this Promise is resolved with the
+ * failure of the specified Promise.
+ *
+ * @param with A Promise whose value or failure must be used to resolve this
+ * Promise. Must not be {@code null}.
+ * @return A Promise that is resolved only when this Promise is resolved by
+ * the specified Promise. The returned Promise must be successfully
+ * resolved with the value {@code null}, if this Promise was
+ * resolved by the specified Promise. The returned Promise must be
+ * resolved with a failure of {@link IllegalStateException}, if this
+ * Promise was already resolved when the specified Promise was
+ * resolved.
+ */
+ Promise<Void> resolveWith(Promise<? extends T> with) {
+ PromiseImpl<Void> chained = new PromiseImpl<Void>();
+ ResolveWith resolveWith = new ResolveWith(chained);
+ with.then(resolveWith, resolveWith);
+ return chained;
+ }
+
+ /**
+ * A callback used to resolve this Promise with another Promise for the
+ * {@link PromiseImpl#resolveWith(Promise)} method.
+ *
+ * @Immutable
+ */
+ private final class ResolveWith implements Success<T, Void>, Failure {
+ private final PromiseImpl<Void> chained;
+
+ ResolveWith(PromiseImpl<Void> chained) {
+ this.chained = chained;
+ }
+
+ public Promise<Void> call(Promise<T> with) throws Exception {
+ try {
+ resolve(with.getValue(), null);
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return null;
+ }
+ chained.resolve(null, null);
+ return null;
+ }
+
+ public void fail(Promise<?> with) throws Exception {
+ try {
+ resolve(null, with.getFailure());
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return;
+ }
+ chained.resolve(null, null);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Promise<T> filter(Predicate<? super T> predicate) {
+ return then(new Filter<T>(predicate));
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#filter(Predicate)} method.
+ *
+ * @Immutable
+ */
+ private static final class Filter<T> implements Success<T, T> {
+ private final Predicate<? super T> predicate;
+
+ Filter(Predicate<? super T> predicate) {
+ this.predicate = requireNonNull(predicate);
+ }
+
+ public Promise<T> call(Promise<T> resolved) throws Exception {
+ if (predicate.test(resolved.getValue())) {
+ return resolved;
+ }
+ throw new NoSuchElementException();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
+ return then(new Map<T, R>(mapper));
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#map(Function)} method.
+ *
+ * @Immutable
+ */
+ private static final class Map<T, R> implements Success<T, R> {
+ private final Function<? super T, ? extends R> mapper;
+
+ Map(Function<? super T, ? extends R> mapper) {
+ this.mapper = requireNonNull(mapper);
+ }
+
+ public Promise<R> call(Promise<T> resolved) throws Exception {
+ return new PromiseImpl<R>(mapper.apply(resolved.getValue()), null);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
+ return then(new FlatMap<T, R>(mapper));
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#flatMap(Function)} method.
+ *
+ * @Immutable
+ */
+ private static final class FlatMap<T, R> implements Success<T, R> {
+ private final Function<? super T, Promise<? extends R>> mapper;
+
+ FlatMap(Function<? super T, Promise<? extends R>> mapper) {
+ this.mapper = requireNonNull(mapper);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Promise<R> call(Promise<T> resolved) throws Exception {
+ return (Promise<R>) mapper.apply(resolved.getValue());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
+ PromiseImpl<T> chained = new PromiseImpl<T>();
+ Recover<T> recover = new Recover<T>(chained, recovery);
+ then(recover, recover);
+ return chained;
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#recover(Function)} method.
+ *
+ * @Immutable
+ */
+ private static final class Recover<T> implements Success<T, Void>, Failure {
+ private final PromiseImpl<T> chained;
+ private final Function<Promise<?>, ? extends T> recovery;
+
+ Recover(PromiseImpl<T> chained, Function<Promise<?>, ? extends T> recovery) {
+ this.chained = chained;
+ this.recovery = requireNonNull(recovery);
+ }
+
+ public Promise<Void> call(Promise<T> resolved) throws Exception {
+ T value;
+ try {
+ value = resolved.getValue();
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return null;
+ }
+ chained.resolve(value, null);
+ return null;
+ }
+
+ public void fail(Promise<?> resolved) throws Exception {
+ T recovered;
+ Throwable failure;
+ try {
+ recovered = recovery.apply(resolved);
+ failure = resolved.getFailure();
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return;
+ }
+ if (recovered == null) {
+ chained.resolve(null, failure);
+ } else {
+ chained.resolve(recovered, null);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
+ PromiseImpl<T> chained = new PromiseImpl<T>();
+ RecoverWith<T> recoverWith = new RecoverWith<T>(chained, recovery);
+ then(recoverWith, recoverWith);
+ return chained;
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#recoverWith(Function)} method.
+ *
+ * @Immutable
+ */
+ private static final class RecoverWith<T> implements Success<T, Void>, Failure {
+ private final PromiseImpl<T> chained;
+ private final Function<Promise<?>, Promise<? extends T>> recovery;
+
+ RecoverWith(PromiseImpl<T> chained, Function<Promise<?>, Promise<? extends T>> recovery) {
+ this.chained = chained;
+ this.recovery = requireNonNull(recovery);
+ }
+
+ public Promise<Void> call(Promise<T> resolved) throws Exception {
+ T value;
+ try {
+ value = resolved.getValue();
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return null;
+ }
+ chained.resolve(value, null);
+ return null;
+ }
+
+ public void fail(Promise<?> resolved) throws Exception {
+ Promise<? extends T> recovered;
+ Throwable failure;
+ try {
+ recovered = recovery.apply(resolved);
+ failure = resolved.getFailure();
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return;
+ }
+ if (recovered == null) {
+ chained.resolve(null, failure);
+ } else {
+ recovered.onResolve(new Chain<T>(chained, recovered));
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Promise<T> fallbackTo(Promise<? extends T> fallback) {
+ PromiseImpl<T> chained = new PromiseImpl<T>();
+ FallbackTo<T> fallbackTo = new FallbackTo<T>(chained, fallback);
+ then(fallbackTo, fallbackTo);
+ return chained;
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method.
+ *
+ * @Immutable
+ */
+ private static final class FallbackTo<T> implements Success<T, Void>, Failure {
+ private final PromiseImpl<T> chained;
+ private final Promise<? extends T> fallback;
+
+ FallbackTo(PromiseImpl<T> chained, Promise<? extends T> fallback) {
+ this.chained = chained;
+ this.fallback = requireNonNull(fallback);
+ }
+
+ public Promise<Void> call(Promise<T> resolved) throws Exception {
+ T value;
+ try {
+ value = resolved.getValue();
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return null;
+ }
+ chained.resolve(value, null);
+ return null;
+ }
+
+ public void fail(Promise<?> resolved) throws Exception {
+ Throwable failure;
+ try {
+ failure = resolved.getFailure();
+ } catch (Throwable e) {
+ chained.resolve(null, e);
+ return;
+ }
+ fallback.onResolve(new Chain<T>(chained, fallback, failure));
+ }
+ }
+
+ static <V> V requireNonNull(V value) {
+ if (value != null) {
+ return value;
+ }
+ throw new NullPointerException();
+ }
+
+ /**
+ * Use the lazy initialization holder class idiom to delay creating a Logger
+ * until we actually need it.
+ */
+ private static final class Logger {
+ private final static java.util.logging.Logger LOGGER;
+ static {
+ LOGGER = java.util.logging.Logger.getLogger(PromiseImpl.class.getName());
+ }
+
+ static void logCallbackException(Throwable t) {
+ LOGGER.log(java.util.logging.Level.WARNING, "Exception from Promise callback", t);
+ }
+ }
+}

Back to the top