diff options
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java')
-rw-r--r-- | bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java | 615 |
1 files changed, 615 insertions, 0 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java new file mode 100644 index 000000000..ca2be5e80 --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java @@ -0,0 +1,615 @@ +/* + * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +import java.lang.reflect.InvocationTargetException; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import org.osgi.util.function.Function; +import org.osgi.util.function.Predicate; + +/** + * Promise implementation. + * + * <p> + * This class is not used directly by clients. Clients should use + * {@link Deferred} to create a resolvable {@link Promise}. + * + * @param <T> The result type associated with the Promise. + * + * @ThreadSafe + * @author $Id$ + */ +final class PromiseImpl<T> implements Promise<T> { + /** + * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no + * additional synchronization is required to write to or read from the + * queue. + */ + private final ConcurrentLinkedQueue<Runnable> callbacks; + /** + * A CountDownLatch to manage the resolved state of this Promise. + * + * <p> + * This object is used as the synchronizing object to provide a critical + * section in {@link #resolve(Object, Throwable)} so that only a single + * thread can write the resolved state variables and open the latch. + * + * <p> + * The resolved state variables, {@link #value} and {@link #fail}, must only + * be written when the latch is closed (getCount() != 0) and must only be + * read when the latch is open (getCount() == 0). The latch state must + * always be checked before writing or reading since the resolved state + * variables' memory consistency is guarded by the latch. + */ + private final CountDownLatch resolved; + /** + * The value of this Promise if successfully resolved. + * + * @GuardedBy("resolved") + * @see #resolved + */ + private T value; + /** + * The failure of this Promise if resolved with a failure or {@code null} if + * successfully resolved. + * + * @GuardedBy("resolved") + * @see #resolved + */ + private Throwable fail; + + /** + * Initialize this Promise. + */ + PromiseImpl() { + callbacks = new ConcurrentLinkedQueue<Runnable>(); + resolved = new CountDownLatch(1); + } + + /** + * Initialize and resolve this Promise. + * + * @param v The value of this resolved Promise. + * @param f The failure of this resolved Promise. + */ + PromiseImpl(T v, Throwable f) { + value = v; + fail = f; + callbacks = new ConcurrentLinkedQueue<Runnable>(); + resolved = new CountDownLatch(0); + } + + /** + * Resolve this Promise. + * + * @param v The value of this Promise. + * @param f The failure of this Promise. + */ + void resolve(T v, Throwable f) { + // critical section: only one resolver at a time + synchronized (resolved) { + if (resolved.getCount() == 0) { + throw new IllegalStateException("Already resolved"); + } + /* + * The resolved state variables must be set before opening the + * latch. This safely publishes them to be read by other threads + * that must verify the latch is open before reading. + */ + value = v; + fail = f; + resolved.countDown(); + } + notifyCallbacks(); // call any registered callbacks + } + + /** + * Call any registered callbacks if this Promise is resolved. + */ + private void notifyCallbacks() { + if (resolved.getCount() != 0) { + return; // return if not resolved + } + + /* + * Note: multiple threads can be in this method removing callbacks from + * the queue and calling them, so the order in which callbacks are + * called cannot be specified. + */ + for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { + try { + callback.run(); + } catch (Throwable t) { + Logger.logCallbackException(t); + } + } + } + + /** + * {@inheritDoc} + */ + public boolean isDone() { + return resolved.getCount() == 0; + } + + /** + * {@inheritDoc} + */ + public T getValue() throws InvocationTargetException, InterruptedException { + resolved.await(); + if (fail == null) { + return value; + } + throw new InvocationTargetException(fail); + } + + /** + * {@inheritDoc} + */ + public Throwable getFailure() throws InterruptedException { + resolved.await(); + return fail; + } + + /** + * {@inheritDoc} + */ + public Promise<T> onResolve(Runnable callback) { + callbacks.offer(callback); + notifyCallbacks(); // call any registered callbacks + return this; + } + + /** + * {@inheritDoc} + */ + public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { + PromiseImpl<R> chained = new PromiseImpl<R>(); + onResolve(new Then<R>(chained, success, failure)); + return chained; + } + + /** + * {@inheritDoc} + */ + public <R> Promise<R> then(Success<? super T, ? extends R> success) { + return then(success, null); + } + + /** + * A callback used to chain promises for the {@link #then(Success, Failure)} + * method. + * + * @Immutable + */ + private final class Then<R> implements Runnable { + private final PromiseImpl<R> chained; + private final Success<T, ? extends R> success; + private final Failure failure; + + @SuppressWarnings("unchecked") + Then(PromiseImpl<R> chained, Success<? super T, ? extends R> success, Failure failure) { + this.chained = chained; + this.success = (Success<T, ? extends R>) success; + this.failure = failure; + } + + public void run() { + Throwable f; + final boolean interrupted = Thread.interrupted(); + try { + f = getFailure(); + } catch (Throwable e) { + f = e; // propagate new exception + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + if (f != null) { + if (failure != null) { + try { + failure.fail(PromiseImpl.this); + } catch (Throwable e) { + f = e; // propagate new exception + } + } + // fail chained + chained.resolve(null, f); + return; + } + Promise<? extends R> returned = null; + if (success != null) { + try { + returned = success.call(PromiseImpl.this); + } catch (Throwable e) { + chained.resolve(null, e); + return; + } + } + if (returned == null) { + // resolve chained with null value + chained.resolve(null, null); + } else { + // resolve chained when returned promise is resolved + returned.onResolve(new Chain<R>(chained, returned)); + } + } + } + + /** + * A callback used to resolve the chained Promise when the Promise promise + * is resolved. + * + * @Immutable + */ + private final static class Chain<R> implements Runnable { + private final PromiseImpl<R> chained; + private final Promise<? extends R> promise; + private final Throwable failure; + + Chain(PromiseImpl<R> chained, Promise<? extends R> promise) { + this.chained = chained; + this.promise = promise; + this.failure = null; + } + + Chain(PromiseImpl<R> chained, Promise<? extends R> promise, Throwable failure) { + this.chained = chained; + this.promise = promise; + this.failure = failure; + } + + public void run() { + R value = null; + Throwable f; + final boolean interrupted = Thread.interrupted(); + try { + f = promise.getFailure(); + if (f == null) { + value = promise.getValue(); + } else if (failure != null) { + f = failure; + } + } catch (Throwable e) { + f = e; // propagate new exception + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + chained.resolve(value, f); + } + } + + /** + * Resolve this Promise with the specified Promise. + * + * <p> + * If the specified Promise is successfully resolved, this Promise is + * resolved with the value of the specified Promise. If the specified + * Promise is resolved with a failure, this Promise is resolved with the + * failure of the specified Promise. + * + * @param with A Promise whose value or failure must be used to resolve this + * Promise. Must not be {@code null}. + * @return A Promise that is resolved only when this Promise is resolved by + * the specified Promise. The returned Promise must be successfully + * resolved with the value {@code null}, if this Promise was + * resolved by the specified Promise. The returned Promise must be + * resolved with a failure of {@link IllegalStateException}, if this + * Promise was already resolved when the specified Promise was + * resolved. + */ + Promise<Void> resolveWith(Promise<? extends T> with) { + PromiseImpl<Void> chained = new PromiseImpl<Void>(); + ResolveWith resolveWith = new ResolveWith(chained); + with.then(resolveWith, resolveWith); + return chained; + } + + /** + * A callback used to resolve this Promise with another Promise for the + * {@link PromiseImpl#resolveWith(Promise)} method. + * + * @Immutable + */ + private final class ResolveWith implements Success<T, Void>, Failure { + private final PromiseImpl<Void> chained; + + ResolveWith(PromiseImpl<Void> chained) { + this.chained = chained; + } + + public Promise<Void> call(Promise<T> with) throws Exception { + try { + resolve(with.getValue(), null); + } catch (Throwable e) { + chained.resolve(null, e); + return null; + } + chained.resolve(null, null); + return null; + } + + public void fail(Promise<?> with) throws Exception { + try { + resolve(null, with.getFailure()); + } catch (Throwable e) { + chained.resolve(null, e); + return; + } + chained.resolve(null, null); + } + } + + /** + * {@inheritDoc} + */ + public Promise<T> filter(Predicate<? super T> predicate) { + return then(new Filter<T>(predicate)); + } + + /** + * A callback used by the {@link PromiseImpl#filter(Predicate)} method. + * + * @Immutable + */ + private static final class Filter<T> implements Success<T, T> { + private final Predicate<? super T> predicate; + + Filter(Predicate<? super T> predicate) { + this.predicate = requireNonNull(predicate); + } + + public Promise<T> call(Promise<T> resolved) throws Exception { + if (predicate.test(resolved.getValue())) { + return resolved; + } + throw new NoSuchElementException(); + } + } + + /** + * {@inheritDoc} + */ + public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { + return then(new Map<T, R>(mapper)); + } + + /** + * A callback used by the {@link PromiseImpl#map(Function)} method. + * + * @Immutable + */ + private static final class Map<T, R> implements Success<T, R> { + private final Function<? super T, ? extends R> mapper; + + Map(Function<? super T, ? extends R> mapper) { + this.mapper = requireNonNull(mapper); + } + + public Promise<R> call(Promise<T> resolved) throws Exception { + return new PromiseImpl<R>(mapper.apply(resolved.getValue()), null); + } + } + + /** + * {@inheritDoc} + */ + public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { + return then(new FlatMap<T, R>(mapper)); + } + + /** + * A callback used by the {@link PromiseImpl#flatMap(Function)} method. + * + * @Immutable + */ + private static final class FlatMap<T, R> implements Success<T, R> { + private final Function<? super T, Promise<? extends R>> mapper; + + FlatMap(Function<? super T, Promise<? extends R>> mapper) { + this.mapper = requireNonNull(mapper); + } + + @SuppressWarnings("unchecked") + public Promise<R> call(Promise<T> resolved) throws Exception { + return (Promise<R>) mapper.apply(resolved.getValue()); + } + } + + /** + * {@inheritDoc} + */ + public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { + PromiseImpl<T> chained = new PromiseImpl<T>(); + Recover<T> recover = new Recover<T>(chained, recovery); + then(recover, recover); + return chained; + } + + /** + * A callback used by the {@link PromiseImpl#recover(Function)} method. + * + * @Immutable + */ + private static final class Recover<T> implements Success<T, Void>, Failure { + private final PromiseImpl<T> chained; + private final Function<Promise<?>, ? extends T> recovery; + + Recover(PromiseImpl<T> chained, Function<Promise<?>, ? extends T> recovery) { + this.chained = chained; + this.recovery = requireNonNull(recovery); + } + + public Promise<Void> call(Promise<T> resolved) throws Exception { + T value; + try { + value = resolved.getValue(); + } catch (Throwable e) { + chained.resolve(null, e); + return null; + } + chained.resolve(value, null); + return null; + } + + public void fail(Promise<?> resolved) throws Exception { + T recovered; + Throwable failure; + try { + recovered = recovery.apply(resolved); + failure = resolved.getFailure(); + } catch (Throwable e) { + chained.resolve(null, e); + return; + } + if (recovered == null) { + chained.resolve(null, failure); + } else { + chained.resolve(recovered, null); + } + } + } + + /** + * {@inheritDoc} + */ + public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { + PromiseImpl<T> chained = new PromiseImpl<T>(); + RecoverWith<T> recoverWith = new RecoverWith<T>(chained, recovery); + then(recoverWith, recoverWith); + return chained; + } + + /** + * A callback used by the {@link PromiseImpl#recoverWith(Function)} method. + * + * @Immutable + */ + private static final class RecoverWith<T> implements Success<T, Void>, Failure { + private final PromiseImpl<T> chained; + private final Function<Promise<?>, Promise<? extends T>> recovery; + + RecoverWith(PromiseImpl<T> chained, Function<Promise<?>, Promise<? extends T>> recovery) { + this.chained = chained; + this.recovery = requireNonNull(recovery); + } + + public Promise<Void> call(Promise<T> resolved) throws Exception { + T value; + try { + value = resolved.getValue(); + } catch (Throwable e) { + chained.resolve(null, e); + return null; + } + chained.resolve(value, null); + return null; + } + + public void fail(Promise<?> resolved) throws Exception { + Promise<? extends T> recovered; + Throwable failure; + try { + recovered = recovery.apply(resolved); + failure = resolved.getFailure(); + } catch (Throwable e) { + chained.resolve(null, e); + return; + } + if (recovered == null) { + chained.resolve(null, failure); + } else { + recovered.onResolve(new Chain<T>(chained, recovered)); + } + } + } + + /** + * {@inheritDoc} + */ + public Promise<T> fallbackTo(Promise<? extends T> fallback) { + PromiseImpl<T> chained = new PromiseImpl<T>(); + FallbackTo<T> fallbackTo = new FallbackTo<T>(chained, fallback); + then(fallbackTo, fallbackTo); + return chained; + } + + /** + * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method. + * + * @Immutable + */ + private static final class FallbackTo<T> implements Success<T, Void>, Failure { + private final PromiseImpl<T> chained; + private final Promise<? extends T> fallback; + + FallbackTo(PromiseImpl<T> chained, Promise<? extends T> fallback) { + this.chained = chained; + this.fallback = requireNonNull(fallback); + } + + public Promise<Void> call(Promise<T> resolved) throws Exception { + T value; + try { + value = resolved.getValue(); + } catch (Throwable e) { + chained.resolve(null, e); + return null; + } + chained.resolve(value, null); + return null; + } + + public void fail(Promise<?> resolved) throws Exception { + Throwable failure; + try { + failure = resolved.getFailure(); + } catch (Throwable e) { + chained.resolve(null, e); + return; + } + fallback.onResolve(new Chain<T>(chained, fallback, failure)); + } + } + + static <V> V requireNonNull(V value) { + if (value != null) { + return value; + } + throw new NullPointerException(); + } + + /** + * Use the lazy initialization holder class idiom to delay creating a Logger + * until we actually need it. + */ + private static final class Logger { + private final static java.util.logging.Logger LOGGER; + static { + LOGGER = java.util.logging.Logger.getLogger(PromiseImpl.class.getName()); + } + + static void logCallbackException(Throwable t) { + LOGGER.log(java.util.logging.Level.WARNING, "Exception from Promise callback", t); + } + } +} |