diff options
author | Thomas Watson | 2017-04-28 16:49:04 +0000 |
---|---|---|
committer | Thomas Watson | 2017-06-16 12:38:08 +0000 |
commit | 9c940504036225a9c7d3f869f6d1ca0f55083e50 (patch) | |
tree | 0457ec5d293c13fbbaa124dcff9bc0e06240a21a | |
parent | 89617708753627e2aecdea7393d64ccf24a0b0eb (diff) | |
download | rt.equinox.framework-9c940504036225a9c7d3f869f6d1ca0f55083e50.tar.gz rt.equinox.framework-9c940504036225a9c7d3f869f6d1ca0f55083e50.tar.xz rt.equinox.framework-9c940504036225a9c7d3f869f6d1ca0f55083e50.zip |
Update promise and function API to 1.1
Change-Id: I606cabb03fb6a9ea7f1aed9757a197e3d70480d8
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
17 files changed, 786 insertions, 300 deletions
diff --git a/bundles/org.eclipse.osgi.util/.classpath b/bundles/org.eclipse.osgi.util/.classpath index 09b9f31b1..7ecc55afe 100644 --- a/bundles/org.eclipse.osgi.util/.classpath +++ b/bundles/org.eclipse.osgi.util/.classpath @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5"/> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/> <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/> <classpathentry kind="src" path="src"> <attributes> diff --git a/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs b/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs index a29e75467..99c66c2d6 100644 --- a/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs +++ b/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs @@ -13,9 +13,9 @@ org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annota org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled -org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7 org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve -org.eclipse.jdt.core.compiler.compliance=1.5 +org.eclipse.jdt.core.compiler.compliance=1.7 org.eclipse.jdt.core.compiler.debug.lineNumber=generate org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.debug.sourceFile=generate @@ -113,6 +113,6 @@ org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning -org.eclipse.jdt.core.compiler.source=1.5 +org.eclipse.jdt.core.compiler.source=1.7 org.eclipse.jdt.core.incompatibleJDKLevel=ignore org.eclipse.jdt.core.incompleteClasspath=error diff --git a/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF b/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF index 7e7bf9de7..19894f6df 100644 --- a/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF +++ b/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF @@ -8,16 +8,16 @@ Bundle-Vendor: %eclipse.org Bundle-Localization: plugin Bundle-DocUrl: http://www.eclipse.org Bundle-ContactAddress: www.eclipse.org -Export-Package: org.osgi.util.function;version="1.0", +Export-Package: org.osgi.util.function;version="1.1", org.osgi.util.measurement;version="1.0.1", org.osgi.util.position;version="1.0.1";uses:="org.osgi.util.measurement", - org.osgi.util.promise;version="1.0";uses:="org.osgi.util.function", + org.osgi.util.promise;version="1.1";uses:="org.osgi.util.function", org.osgi.util.xml;version="1.0.1";uses:="org.osgi.framework,javax.xml.parsers" Import-Package: org.osgi.framework; version=1.1, javax.xml.parsers, - org.osgi.util.function; version="[1.0, 1.1)", + org.osgi.util.function; version="[1.1, 1.2)", org.osgi.util.measurement; version="[1.0.1, 1.1)", org.osgi.util.position; version="[1.0.1, 1.1)", - org.osgi.util.promise; version="[1.0, 1.1)", + org.osgi.util.promise; version="[1.1, 1.2)", org.osgi.util.xml; version="[1.0.1, 1.1)" -Bundle-RequiredExecutionEnvironment: J2SE-1.5 +Bundle-RequiredExecutionEnvironment: JavaSE-1.7 diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java new file mode 100644 index 000000000..17ff376bc --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) OSGi Alliance (2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.function; + +import org.osgi.annotation.versioning.ConsumerType; + +/** + * A callback that performs an operation and may throw an exception. + * <p> + * This is a functional interface and can be used as the assignment target for a + * lambda expression or method reference. + * + * @ThreadSafe + * @since 1.1 + * @author $Id$ + */ +@ConsumerType +@FunctionalInterface +public interface Callback { + /** + * Execute the callback. + * + * @throws Exception An exception thrown by the method. + */ + void run() throws Exception; +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java index 5d812f75c..3d17c97c7 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,12 +32,14 @@ import org.osgi.annotation.versioning.ConsumerType; * @author $Id$ */ @ConsumerType +@FunctionalInterface public interface Function<T, R> { /** * Applies this function to the specified argument. * * @param t The input to this function. * @return The output of this function. + * @throws Exception An exception thrown by the method. */ - R apply(T t); + R apply(T t) throws Exception; } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java index 0c2c61f78..681b771c2 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.osgi.annotation.versioning.ConsumerType; * @author $Id$ */ @ConsumerType +@FunctionalInterface public interface Predicate<T> { /** * Evaluates this predicate on the specified argument. @@ -38,6 +39,7 @@ public interface Predicate<T> { * @param t The input to this predicate. * @return {@code true} if the specified argument is accepted by this * predicate; {@code false} otherwise. + * @throws Exception An exception thrown by the method. */ - boolean test(T t); + boolean test(T t) throws Exception; } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java index 899d786b2..82ed82dc4 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,25 +15,23 @@ */ /** - * Function Package Version 1.0. - * + * Function Package Version 1.1. * <p> * Bundles wishing to use this package must list the package in the * Import-Package header of the bundle's manifest. - * * <p> * Example import for consumers using the API in this package: * <p> - * {@code Import-Package: org.osgi.util.function; version="[1.0,2.0)"} + * {@code Import-Package: org.osgi.util.function; version="[1.1,2.0)"} * <p> * Example import for providers implementing the API in this package: * <p> - * {@code Import-Package: org.osgi.util.function; version="[1.0,1.1)"} + * {@code Import-Package: org.osgi.util.function; version="[1.1,1.2)"} * * @author $Id$ */ -@Version("1.0") +@Version("1.1") package org.osgi.util.function; import org.osgi.annotation.versioning.Version; diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo deleted file mode 100644 index 7c8de0324..000000000 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo +++ /dev/null @@ -1 +0,0 @@ -version 1.0 diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java index e9ff0a6fa..004925fa1 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ public class Deferred<T> { * Create a new Deferred with an associated Promise. */ public Deferred() { - promise = new PromiseImpl<T>(); + promise = new PromiseImpl<>(); } /** @@ -64,7 +64,7 @@ public class Deferred<T> { * After the associated Promise is resolved with the specified value, all * registered {@link Promise#onResolve(Runnable) callbacks} are called and * any {@link Promise#then(Success, Failure) chained} Promises are resolved. - * + * This may occur asynchronously to this method. * <p> * Resolving the associated Promise <i>happens-before</i> any registered * callback is called. That is, in a registered callback, @@ -87,7 +87,7 @@ public class Deferred<T> { * After the associated Promise is resolved with the specified failure, all * registered {@link Promise#onResolve(Runnable) callbacks} are called and * any {@link Promise#then(Success, Failure) chained} Promises are resolved. - * + * This may occur asynchronously to this method. * <p> * Resolving the associated Promise <i>happens-before</i> any registered * callback is called. That is, in a registered callback, @@ -118,7 +118,7 @@ public class Deferred<T> { * After the associated Promise is resolved with the specified Promise, all * registered {@link Promise#onResolve(Runnable) callbacks} are called and * any {@link Promise#then(Success, Failure) chained} Promises are resolved. - * + * This may occur asynchronously to this method. * <p> * Resolving the associated Promise <i>happens-before</i> any registered * callback is called. That is, in a registered callback, @@ -139,4 +139,15 @@ public class Deferred<T> { public Promise<Void> resolveWith(Promise<? extends T> with) { return promise.resolveWith(with); } + + /** + * Returns a string representation of the associated Promise. + * + * @return A string representation of the associated Promise. + * @since 1.1 + */ + @Override + public String toString() { + return promise.toString(); + } } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java index 4e6a17884..9d491b595 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2015). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import org.osgi.annotation.versioning.ConsumerType; * @author $Id$ */ @ConsumerType +@FunctionalInterface public interface Failure { /** * Failure callback for a Promise. diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java index ffa193393..f9e4ef5d2 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,14 @@ package org.osgi.util.promise; import java.lang.reflect.InvocationTargetException; + import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.function.Callback; import org.osgi.util.function.Function; import org.osgi.util.function.Predicate; /** * A Promise of a value. - * * <p> * A Promise represents a future value. It handles the interactions for * asynchronous processing. A {@link Deferred} object can be used to create a @@ -33,7 +34,6 @@ import org.osgi.util.function.Predicate; * or the Promise can be used in chaining. In chaining, callbacks are provided * that receive the resolved Promise, and a new Promise is generated that * resolves based upon the result of a callback. - * * <p> * Both {@link #onResolve(Runnable) callbacks} and * {@link #then(Success, Failure) chaining} can be repeated any number of times, @@ -42,32 +42,19 @@ import org.osgi.util.function.Predicate; * Example callback usage: * * <pre> - * final Promise<String> foo = foo(); - * foo.onResolve(new Runnable() { - * public void run() { - * System.out.println(foo.getValue()); - * } - * }); + * Promise<String> foo = foo(); + * foo.onResolve(() -> System.out.println("resolved")); * </pre> * * Example chaining usage; * * <pre> - * Success<String,String> doubler = new Success<String,String>() { - * public Promise<String> call(Promise<String> p) throws Exception { - * return Promises.resolved(p.getValue()+p.getValue()); - * } - * }; - * final Promise<String> foo = foo().then(doubler).then(doubler); - * foo.onResolve(new Runnable() { - * public void run() { - * System.out.println(foo.getValue()); - * } - * }); + * Success<String,String> doubler = p -> Promises + * .resolved(p.getValue() + p.getValue()); + * Promise<String> foo = foo().then(doubler).then(doubler); * </pre> * * @param <T> The value type associated with this Promise. - * * @ThreadSafe * @author $Id$ */ @@ -231,6 +218,40 @@ public interface Promise<T> { <R> Promise<R> then(Success<? super T, ? extends R> success); /** + * Chain a new Promise to this Promise with a callback. + * <p> + * The specified {@link Callback} is called when this Promise is resolved + * either successfully or with a failure. + * <p> + * This method returns a new Promise which is chained to this Promise. The + * returned Promise must be resolved when this Promise is resolved after the + * specified callback is executed. If the callback throws an exception, the + * returned Promise is failed with that exception. Otherwise the returned + * Promise is resolved with this Promise. + * <p> + * This method may be called at any time including before and after this + * Promise has been resolved. + * <p> + * Resolving this Promise <i>happens-before</i> any registered callback is + * called. That is, in a registered callback, {@link #isDone()} must return + * {@code true} and {@link #getValue()} and {@link #getFailure()} must not + * block. + * <p> + * A callback may be called on a different thread than the thread which + * registered the callback. So the callback must be thread safe but can rely + * upon that the registration of the callback <i>happens-before</i> the + * registered callback is called. + * + * @param callback A callback to be called when this Promise is resolved. + * Must not be {@code null}. + * @return A new Promise which is chained to this Promise. The returned + * Promise must be resolved when this Promise is resolved after the + * specified callback is executed. + * @since 1.1 + */ + Promise<T> then(Callback callback); + + /** * Filter the value of this Promise. * * <p> @@ -400,4 +421,36 @@ public interface Promise<T> { * the value of the specified Promise. */ Promise<T> fallbackTo(Promise<? extends T> fallback); + + /** + * Time out the resolution of this Promise. + * <p> + * If this Promise is successfully resolved before the timeout, the returned + * Promise is resolved with the value of this Promise. If this Promise is + * resolved with a failure before the timeout, the returned Promise is + * resolved with the failure of this Promise. If the timeout is reached + * before this Promise is resolved, the returned Promise is failed with a + * {@link TimeoutException}. + * + * @param milliseconds The time to wait in milliseconds. Zero and negative + * time is treated as an immediate timeout. + * @return A Promise that is resolved when either this Promise is resolved + * or the specified timeout is reached. + * @since 1.1 + */ + Promise<T> timeout(long milliseconds); + + /** + * Delay after the resolution of this Promise. + * <p> + * Once this Promise is resolved, resolve the returned Promise with this + * Promise after the specified delay. + * + * @param milliseconds The time to delay in milliseconds. Zero and negative + * time is treated as no delay. + * @return A Promise that is resolved with this Promise after this Promise + * is resolved and the specified delay has elapsed. + * @since 1.1 + */ + Promise<T> delay(long milliseconds); } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java index ca2be5e80..2afb3771f 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,25 @@ package org.osgi.util.promise; import java.lang.reflect.InvocationTargetException; import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.osgi.util.function.Callback; import org.osgi.util.function.Function; import org.osgi.util.function.Predicate; @@ -44,12 +61,10 @@ final class PromiseImpl<T> implements Promise<T> { private final ConcurrentLinkedQueue<Runnable> callbacks; /** * A CountDownLatch to manage the resolved state of this Promise. - * * <p> * This object is used as the synchronizing object to provide a critical - * section in {@link #resolve(Object, Throwable)} so that only a single + * section in {@link #tryResolve(Object, Throwable)} so that only a single * thread can write the resolved state variables and open the latch. - * * <p> * The resolved state variables, {@link #value} and {@link #fail}, must only * be written when the latch is closed (getCount() != 0) and must only be @@ -78,7 +93,7 @@ final class PromiseImpl<T> implements Promise<T> { * Initialize this Promise. */ PromiseImpl() { - callbacks = new ConcurrentLinkedQueue<Runnable>(); + callbacks = new ConcurrentLinkedQueue<>(); resolved = new CountDownLatch(1); } @@ -89,34 +104,63 @@ final class PromiseImpl<T> implements Promise<T> { * @param f The failure of this resolved Promise. */ PromiseImpl(T v, Throwable f) { - value = v; - fail = f; - callbacks = new ConcurrentLinkedQueue<Runnable>(); + if (f == null) { + value = v; + } else { + fail = f; + } + callbacks = new ConcurrentLinkedQueue<>(); resolved = new CountDownLatch(0); } /** - * Resolve this Promise. + * Try to resolve this Promise. + * <p> + * If this Promise was already resolved, return false. Otherwise, resolve + * this Promise and return true. * * @param v The value of this Promise. * @param f The failure of this Promise. + * @return false if this Promise was already resolved; true if this method + * resolved this Promise. + * @since 1.1 */ - void resolve(T v, Throwable f) { + boolean tryResolve(T v, Throwable f) { // critical section: only one resolver at a time synchronized (resolved) { if (resolved.getCount() == 0) { - throw new IllegalStateException("Already resolved"); + return false; } /* * The resolved state variables must be set before opening the * latch. This safely publishes them to be read by other threads * that must verify the latch is open before reading. */ - value = v; - fail = f; + if (f == null) { + value = v; + } else { + fail = f; + } resolved.countDown(); } notifyCallbacks(); // call any registered callbacks + return true; + } + + /** + * Resolve this Promise. + * <p> + * If this Promise was already resolved, throw IllegalStateException. + * Otherwise, resolve this Promise. + * + * @param v The value of this Promise. + * @param f The failure of this Promise. + * @throws IllegalStateException If this Promise was already resolved. + */ + void resolve(T v, Throwable f) { + if (!tryResolve(v, f)) { + throw new IllegalStateException("Already resolved"); + } } /** @@ -129,21 +173,18 @@ final class PromiseImpl<T> implements Promise<T> { /* * Note: multiple threads can be in this method removing callbacks from - * the queue and calling them, so the order in which callbacks are - * called cannot be specified. + * the queue and executing them, so the order in which callbacks are + * executed cannot be specified. */ for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { - try { - callback.run(); - } catch (Throwable t) { - Logger.logCallbackException(t); - } + Callbacks.execute(callback); } } /** * {@inheritDoc} */ + @Override public boolean isDone() { return resolved.getCount() == 0; } @@ -151,6 +192,7 @@ final class PromiseImpl<T> implements Promise<T> { /** * {@inheritDoc} */ + @Override public T getValue() throws InvocationTargetException, InterruptedException { resolved.await(); if (fail == null) { @@ -162,14 +204,40 @@ final class PromiseImpl<T> implements Promise<T> { /** * {@inheritDoc} */ + @Override public Throwable getFailure() throws InterruptedException { resolved.await(); return fail; } /** + * @since 1.1 + */ + @Override + public String toString() { + if (!isDone()) { + return super.toString() + "[unresolved]"; + } + final boolean interrupted = Thread.interrupted(); + try { + Throwable t = getFailure(); + if (t != null) { + return super.toString() + "[failed: " + t + "]"; + } + return super.toString() + "[resolved: " + getValue() + "]"; + } catch (InterruptedException | InvocationTargetException e) { + return super.toString() + "[" + e + "]"; + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + } + + /** * {@inheritDoc} */ + @Override public Promise<T> onResolve(Runnable callback) { callbacks.offer(callback); notifyCallbacks(); // call any registered callbacks @@ -179,15 +247,17 @@ final class PromiseImpl<T> implements Promise<T> { /** * {@inheritDoc} */ + @Override public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { - PromiseImpl<R> chained = new PromiseImpl<R>(); - onResolve(new Then<R>(chained, success, failure)); + PromiseImpl<R> chained = new PromiseImpl<>(); + onResolve(chained.new Then<>(this, success, failure)); return chained; } /** * {@inheritDoc} */ + @Override public <R> Promise<R> then(Success<? super T, ? extends R> success) { return then(success, null); } @@ -198,58 +268,44 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private final class Then<R> implements Runnable { - private final PromiseImpl<R> chained; - private final Success<T, ? extends R> success; + private final class Then<P> implements Runnable { + private final Promise<P> promise; + private final Success<P, ? extends T> success; private final Failure failure; @SuppressWarnings("unchecked") - Then(PromiseImpl<R> chained, Success<? super T, ? extends R> success, Failure failure) { - this.chained = chained; - this.success = (Success<T, ? extends R>) success; + Then(Promise<P> promise, Success< ? super P, ? extends T> success, + Failure failure) { + this.promise = promise; + this.success = (Success<P, ? extends T>) success; this.failure = failure; } + @Override public void run() { - Throwable f; - final boolean interrupted = Thread.interrupted(); - try { - f = getFailure(); - } catch (Throwable e) { - f = e; // propagate new exception - } finally { - if (interrupted) { // restore interrupt status - Thread.currentThread().interrupt(); - } - } + Throwable f = Result.collect(promise).fail; if (f != null) { if (failure != null) { try { - failure.fail(PromiseImpl.this); + failure.fail(promise); } catch (Throwable e) { f = e; // propagate new exception } } - // fail chained - chained.resolve(null, f); - return; - } - Promise<? extends R> returned = null; - if (success != null) { + } else if (success != null) { + Promise< ? extends T> returned = null; try { - returned = success.call(PromiseImpl.this); + returned = success.call(promise); } catch (Throwable e) { - chained.resolve(null, e); + f = e; // propagate new exception + } + if (returned != null) { + // resolve chained when returned promise is resolved + returned.onResolve(new Chain(returned)); return; } } - if (returned == null) { - // resolve chained with null value - chained.resolve(null, null); - } else { - // resolve chained when returned promise is resolved - returned.onResolve(new Chain<R>(chained, returned)); - } + tryResolve(null, f); } } @@ -259,46 +315,58 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private final static class Chain<R> implements Runnable { - private final PromiseImpl<R> chained; - private final Promise<? extends R> promise; + private final class Chain implements Runnable { + private final Promise< ? extends T> promise; private final Throwable failure; + private final Callback callback; - Chain(PromiseImpl<R> chained, Promise<? extends R> promise) { - this.chained = chained; + Chain(Promise< ? extends T> promise) { this.promise = promise; this.failure = null; + this.callback = null; } - Chain(PromiseImpl<R> chained, Promise<? extends R> promise, Throwable failure) { - this.chained = chained; + Chain(Promise< ? extends T> promise, Throwable failure) { this.promise = promise; - this.failure = failure; + this.failure = requireNonNull(failure); + this.callback = null; + } + + Chain(Promise< ? extends T> promise, Callback callback) { + this.promise = promise; + this.failure = null; + this.callback = requireNonNull(callback); } + @Override public void run() { - R value = null; - Throwable f; - final boolean interrupted = Thread.interrupted(); - try { - f = promise.getFailure(); - if (f == null) { - value = promise.getValue(); - } else if (failure != null) { - f = failure; - } - } catch (Throwable e) { - f = e; // propagate new exception - } finally { - if (interrupted) { // restore interrupt status - Thread.currentThread().interrupt(); + if (callback != null) { + try { + callback.run(); + } catch (Throwable e) { + tryResolve(null, e); + return; } } - chained.resolve(value, f); + Result<T> result = Result.collect(promise); + if ((result.fail != null) && (failure != null)) { + result.fail = failure; + } + tryResolve(result.value, result.fail); } } /** + * {@inheritDoc} + */ + @Override + public Promise<T> then(Callback callback) { + PromiseImpl<T> chained = new PromiseImpl<>(); + onResolve(chained.new Chain(this, callback)); + return chained; + } + + /** * Resolve this Promise with the specified Promise. * * <p> @@ -318,9 +386,8 @@ final class PromiseImpl<T> implements Promise<T> { * resolved. */ Promise<Void> resolveWith(Promise<? extends T> with) { - PromiseImpl<Void> chained = new PromiseImpl<Void>(); - ResolveWith resolveWith = new ResolveWith(chained); - with.then(resolveWith, resolveWith); + PromiseImpl<Void> chained = new PromiseImpl<>(); + with.onResolve(new ResolveWith(with, chained)); return chained; } @@ -330,40 +397,36 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private final class ResolveWith implements Success<T, Void>, Failure { + private final class ResolveWith implements Runnable { + private final Promise< ? extends T> promise; private final PromiseImpl<Void> chained; - ResolveWith(PromiseImpl<Void> chained) { + ResolveWith(Promise< ? extends T> promise, PromiseImpl<Void> chained) { + this.promise = promise; this.chained = chained; } - public Promise<Void> call(Promise<T> with) throws Exception { - try { - resolve(with.getValue(), null); - } catch (Throwable e) { - chained.resolve(null, e); - return null; - } - chained.resolve(null, null); - return null; - } - - public void fail(Promise<?> with) throws Exception { + @Override + public void run() { + Throwable f = null; + Result<T> result = Result.collect(promise); try { - resolve(null, with.getFailure()); + resolve(result.value, result.fail); } catch (Throwable e) { - chained.resolve(null, e); - return; + f = e; // propagate new exception } - chained.resolve(null, null); + chained.tryResolve(null, f); } } /** * {@inheritDoc} */ + @Override public Promise<T> filter(Predicate<? super T> predicate) { - return then(new Filter<T>(predicate)); + PromiseImpl<T> chained = new PromiseImpl<>(); + onResolve(chained.new Filter(this, predicate)); + return chained; } /** @@ -371,26 +434,39 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private static final class Filter<T> implements Success<T, T> { + private final class Filter implements Runnable { + private final Promise< ? extends T> promise; private final Predicate<? super T> predicate; - Filter(Predicate<? super T> predicate) { + Filter(Promise< ? extends T> promise, Predicate< ? super T> predicate) { + this.promise = promise; this.predicate = requireNonNull(predicate); } - public Promise<T> call(Promise<T> resolved) throws Exception { - if (predicate.test(resolved.getValue())) { - return resolved; + @Override + public void run() { + Result<T> result = Result.collect(promise); + if (result.fail == null) { + try { + if (!predicate.test(result.value)) { + result.fail = new NoSuchElementException(); + } + } catch (Throwable e) { // propagate new exception + result.fail = e; + } } - throw new NoSuchElementException(); + tryResolve(result.value, result.fail); } } /** * {@inheritDoc} */ + @Override public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { - return then(new Map<T, R>(mapper)); + PromiseImpl<R> chained = new PromiseImpl<>(); + onResolve(chained.new Map<>(this, mapper)); + return chained; } /** @@ -398,23 +474,39 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private static final class Map<T, R> implements Success<T, R> { - private final Function<? super T, ? extends R> mapper; + private final class Map<P> implements Runnable { + private final Promise< ? extends P> promise; + private final Function<? super P, ? extends T> mapper; - Map(Function<? super T, ? extends R> mapper) { + Map(Promise< ? extends P> promise, + Function< ? super P, ? extends T> mapper) { + this.promise = promise; this.mapper = requireNonNull(mapper); } - public Promise<R> call(Promise<T> resolved) throws Exception { - return new PromiseImpl<R>(mapper.apply(resolved.getValue()), null); + @Override + public void run() { + Result<P> result = Result.collect(promise); + T v = null; + if (result.fail == null) { + try { + v = mapper.apply(result.value); + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + } + tryResolve(v, result.fail); } } /** * {@inheritDoc} */ + @Override public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { - return then(new FlatMap<T, R>(mapper)); + PromiseImpl<R> chained = new PromiseImpl<>(); + onResolve(chained.new FlatMap<>(this, mapper)); + return chained; } /** @@ -422,26 +514,42 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private static final class FlatMap<T, R> implements Success<T, R> { - private final Function<? super T, Promise<? extends R>> mapper; + private final class FlatMap<P> implements Runnable { + private final Promise< ? extends P> promise; + private final Function< ? super P,Promise< ? extends T>> mapper; - FlatMap(Function<? super T, Promise<? extends R>> mapper) { + FlatMap(Promise< ? extends P> promise, + Function< ? super P,Promise< ? extends T>> mapper) { + this.promise = promise; this.mapper = requireNonNull(mapper); } - @SuppressWarnings("unchecked") - public Promise<R> call(Promise<T> resolved) throws Exception { - return (Promise<R>) mapper.apply(resolved.getValue()); + @Override + public void run() { + Result<P> result = Result.collect(promise); + if (result.fail == null) { + Promise< ? extends T> flatmap = null; + try { + flatmap = mapper.apply(result.value); + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + if (flatmap != null) { + flatmap.onResolve(new Chain(flatmap)); + return; + } + } + tryResolve(null, result.fail); } } /** * {@inheritDoc} */ + @Override public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { - PromiseImpl<T> chained = new PromiseImpl<T>(); - Recover<T> recover = new Recover<T>(chained, recovery); - then(recover, recover); + PromiseImpl<T> chained = new PromiseImpl<>(); + onResolve(chained.new Recover(this, recovery)); return chained; } @@ -450,52 +558,41 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private static final class Recover<T> implements Success<T, Void>, Failure { - private final PromiseImpl<T> chained; - private final Function<Promise<?>, ? extends T> recovery; + private final class Recover implements Runnable { + private final Promise<T> promise; + private final Function<Promise< ? >, ? extends T> recovery; - Recover(PromiseImpl<T> chained, Function<Promise<?>, ? extends T> recovery) { - this.chained = chained; + Recover(Promise<T> promise, + Function<Promise< ? >, ? extends T> recovery) { + this.promise = promise; this.recovery = requireNonNull(recovery); } - public Promise<Void> call(Promise<T> resolved) throws Exception { - T value; - try { - value = resolved.getValue(); - } catch (Throwable e) { - chained.resolve(null, e); - return null; - } - chained.resolve(value, null); - return null; - } - - public void fail(Promise<?> resolved) throws Exception { - T recovered; - Throwable failure; - try { - recovered = recovery.apply(resolved); - failure = resolved.getFailure(); - } catch (Throwable e) { - chained.resolve(null, e); - return; - } - if (recovered == null) { - chained.resolve(null, failure); - } else { - chained.resolve(recovered, null); + @Override + public void run() { + Result<T> result = Result.collect(promise); + if (result.fail != null) { + try { + T v = recovery.apply(promise); + if (v != null) { + result.value = v; + result.fail = null; + } + } catch (Throwable e) { // propagate new exception + result.fail = e; + } } + tryResolve(result.value, result.fail); } } /** * {@inheritDoc} */ + @Override public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { - PromiseImpl<T> chained = new PromiseImpl<T>(); - RecoverWith<T> recoverWith = new RecoverWith<T>(chained, recovery); - then(recoverWith, recoverWith); + PromiseImpl<T> chained = new PromiseImpl<>(); + onResolve(chained.new RecoverWith(this, recovery)); return chained; } @@ -504,112 +601,364 @@ final class PromiseImpl<T> implements Promise<T> { * * @Immutable */ - private static final class RecoverWith<T> implements Success<T, Void>, Failure { - private final PromiseImpl<T> chained; + private final class RecoverWith implements Runnable { + private final Promise<T> promise; private final Function<Promise<?>, Promise<? extends T>> recovery; - RecoverWith(PromiseImpl<T> chained, Function<Promise<?>, Promise<? extends T>> recovery) { - this.chained = chained; + RecoverWith(Promise<T> promise, + Function<Promise< ? >,Promise< ? extends T>> recovery) { + this.promise = promise; this.recovery = requireNonNull(recovery); } - public Promise<Void> call(Promise<T> resolved) throws Exception { - T value; - try { - value = resolved.getValue(); - } catch (Throwable e) { - chained.resolve(null, e); - return null; + @Override + public void run() { + Result<T> result = Result.collect(promise); + if (result.fail != null) { + Promise< ? extends T> recovered = null; + try { + recovered = recovery.apply(promise); + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + if (recovered != null) { + recovered.onResolve(new Chain(recovered)); + return; + } } - chained.resolve(value, null); - return null; + tryResolve(result.value, result.fail); } + } - public void fail(Promise<?> resolved) throws Exception { - Promise<? extends T> recovered; - Throwable failure; - try { - recovered = recovery.apply(resolved); - failure = resolved.getFailure(); - } catch (Throwable e) { - chained.resolve(null, e); + /** + * {@inheritDoc} + */ + @Override + public Promise<T> fallbackTo(Promise<? extends T> fallback) { + PromiseImpl<T> chained = new PromiseImpl<>(); + onResolve(chained.new FallbackTo(this, fallback)); + return chained; + } + + /** + * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method. + * + * @Immutable + */ + private final class FallbackTo implements Runnable { + private final Promise<T> promise; + private final Promise<? extends T> fallback; + + FallbackTo(Promise<T> promise, Promise< ? extends T> fallback) { + this.promise = promise; + this.fallback = requireNonNull(fallback); + } + + @Override + public void run() { + Result<T> result = Result.collect(promise); + if (result.fail != null) { + fallback.onResolve(new Chain(fallback, result.fail)); return; } - if (recovered == null) { - chained.resolve(null, failure); - } else { - recovered.onResolve(new Chain<T>(chained, recovered)); + tryResolve(result.value, result.fail); + } + } + + /** + * {@inheritDoc} + * + * @since 1.1 + */ + @Override + public Promise<T> timeout(long millis) { + PromiseImpl<T> chained = new PromiseImpl<>(); + if (!isDone()) { + onResolve(chained.new Timeout(millis, TimeUnit.MILLISECONDS)); + } + onResolve(chained.new Chain(this)); + return chained; + } + + /** + * Timeout class used by the {@link PromiseImpl#timeout(long)} method to + * cancel timeout when the Promise is resolved. + * + * @Immutable + * @since 1.1 + */ + private final class Timeout implements Runnable { + private final ScheduledFuture< ? > future; + + Timeout(long timeout, TimeUnit unit) { + future = Callbacks.schedule(new TimeoutAction(), timeout, unit); + } + + @Override + public void run() { + if (future != null) { + future.cancel(false); } } } + + /** + * Callback used to fail the Promise if the timeout expires. + * + * @Immutable + */ + private final class TimeoutAction implements Runnable { + TimeoutAction() {} + + @Override + public void run() { + tryResolve(null, new TimeoutException()); + } + } /** * {@inheritDoc} + * + * @since 1.1 */ - public Promise<T> fallbackTo(Promise<? extends T> fallback) { - PromiseImpl<T> chained = new PromiseImpl<T>(); - FallbackTo<T> fallbackTo = new FallbackTo<T>(chained, fallback); - then(fallbackTo, fallbackTo); + @Override + public Promise<T> delay(long millis) { + PromiseImpl<T> chained = new PromiseImpl<>(); + onResolve(new Delay(chained.new Chain(this), millis, + TimeUnit.MILLISECONDS)); return chained; } /** - * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method. + * Delay class used by the {@link PromiseImpl#delay(long)} method to delay + * chaining a promise. * * @Immutable + * @since 1.1 */ - private static final class FallbackTo<T> implements Success<T, Void>, Failure { - private final PromiseImpl<T> chained; - private final Promise<? extends T> fallback; + private static final class Delay implements Runnable { + private final Runnable callback; + private final long delay; + private final TimeUnit unit; + + Delay(Runnable callback, long delay, TimeUnit unit) { + this.callback = callback; + this.delay = delay; + this.unit = unit; + } - FallbackTo(PromiseImpl<T> chained, Promise<? extends T> fallback) { - this.chained = chained; - this.fallback = requireNonNull(fallback); + @Override + public void run() { + Callbacks.schedule(callback, delay, unit); } + } - public Promise<Void> call(Promise<T> resolved) throws Exception { - T value; + /** + * Callback handler used to asynchronously execute callbacks. + * + * @Immutable + * @since 1.1 + */ + private static final class Callbacks + implements ThreadFactory, RejectedExecutionHandler, Runnable { + private static final Callbacks callbacks; + private static final ScheduledExecutor scheduledExecutor; + private static final ThreadPoolExecutor callbackExecutor; + static { + callbacks = new Callbacks(); + scheduledExecutor = new ScheduledExecutor(2, callbacks); + callbackExecutor = new ThreadPoolExecutor(0, 64, 60L, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + callbacks, callbacks); + } + + /** + * Schedule a callback on the scheduled executor + */ + static ScheduledFuture< ? > schedule(Runnable callback, long delay, + TimeUnit unit) { try { - value = resolved.getValue(); - } catch (Throwable e) { - chained.resolve(null, e); + return scheduledExecutor.schedule(callback, delay, unit); + } catch (RejectedExecutionException e) { + callbacks.rejectedExecution(callback, scheduledExecutor); return null; } - chained.resolve(value, null); - return null; } - public void fail(Promise<?> resolved) throws Exception { - Throwable failure; + /** + * Execute a callback on the callback executor + */ + static void execute(Runnable callback) { + callbackExecutor.execute(callback); + } + + static void uncaughtException(Throwable t) { try { - failure = resolved.getFailure(); - } catch (Throwable e) { - chained.resolve(null, e); - return; + Thread thread = Thread.currentThread(); + thread.getUncaughtExceptionHandler().uncaughtException(thread, + t); + } catch (Throwable ignored) { + // we ignore this } - fallback.onResolve(new Chain<T>(chained, fallback, failure)); } - } - static <V> V requireNonNull(V value) { - if (value != null) { - return value; + private final AtomicBoolean shutdownHookInstalled; + private final ThreadFactory delegateThreadFactory; + + private Callbacks() { + shutdownHookInstalled = new AtomicBoolean(); + delegateThreadFactory = Executors.defaultThreadFactory(); + } + + /** + * Executor threads should not prevent VM from exiting + */ + @Override + public Thread newThread(Runnable r) { + if (shutdownHookInstalled.compareAndSet(false, true)) { + Thread shutdownThread = delegateThreadFactory.newThread(this); + shutdownThread.setName("ExecutorShutdownHook," + + shutdownThread.getName()); + try { + Runtime.getRuntime().addShutdownHook(shutdownThread); + } catch (IllegalStateException e) { + // VM is already shutting down... + callbackExecutor.shutdown(); + scheduledExecutor.shutdown(); + } + } + Thread t = delegateThreadFactory.newThread(r); + t.setName("PromiseImpl," + t.getName()); + t.setDaemon(true); + return t; + } + + /** + * Call the callback using the caller's thread because the thread pool + * rejected the execution. + */ + @Override + public void rejectedExecution(Runnable callback, + ThreadPoolExecutor executor) { + try { + callback.run(); + } catch (Throwable t) { + uncaughtException(t); + } + } + + /** + * Shutdown hook + */ + @Override + public void run() { + // limit new thread creation + callbackExecutor.setMaximumPoolSize( + Math.max(1, callbackExecutor.getPoolSize())); + // Run all delayed callbacks now + scheduledExecutor.shutdown(); + BlockingQueue<Runnable> queue = scheduledExecutor.getQueue(); + if (!queue.isEmpty()) { + for (Object r : queue.toArray()) { + if (r instanceof RunnableScheduledFuture< ? >) { + RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r; + if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L) + && queue.remove(future)) { + future.run(); + scheduledExecutor.afterExecute(future, null); + } + } + } + scheduledExecutor.shutdown(); + } + try { + scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Shutdown callback executor + callbackExecutor.shutdown(); + try { + callbackExecutor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * ScheduledThreadPoolExecutor for scheduled execution. + * + * @ThreadSafe + */ + private static final class ScheduledExecutor + extends ScheduledThreadPoolExecutor { + ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + /** + * Handle uncaught exceptions + */ + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if ((t == null) && (r instanceof Future< ? >)) { + boolean interrupted = Thread.interrupted(); + try { + ((Future< ? >) r).get(); + } catch (CancellationException e) { + // ignore + } catch (InterruptedException e) { + interrupted = true; + } catch (ExecutionException e) { + t = e.getCause(); + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + } + if (t != null) { + uncaughtException(t); + } + } } - throw new NullPointerException(); } /** - * Use the lazy initialization holder class idiom to delay creating a Logger - * until we actually need it. + * A holder of the result of a promise. + * + * @NotThreadSafe */ - private static final class Logger { - private final static java.util.logging.Logger LOGGER; - static { - LOGGER = java.util.logging.Logger.getLogger(PromiseImpl.class.getName()); + static final class Result<P> { + Throwable fail; + P value; + + Result() {} + + static <R> Result<R> collect(Promise< ? extends R> promise) { + Result<R> result = new Result<>(); + final boolean interrupted = Thread.interrupted(); + try { + result.fail = promise.getFailure(); + if (result.fail == null) { + result.value = promise.getValue(); + } + } catch (Throwable e) { + result.fail = e; // propagate new exception + } finally { + if (interrupted) { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + return result; } + } - static void logCallbackException(Throwable t) { - LOGGER.log(java.util.logging.Level.WARNING, "Exception from Promise callback", t); + static <V> V requireNonNull(V value) { + if (value != null) { + return value; } + throw new NullPointerException(); } } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java index 06d506675..7067fe50a 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,15 @@ package org.osgi.util.promise; import static org.osgi.util.promise.PromiseImpl.requireNonNull; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.osgi.util.promise.PromiseImpl.Result; + /** * Static helper methods for {@link Promise}s. * @@ -42,7 +45,7 @@ public class Promises { * @return A new Promise that has been resolved with the specified value. */ public static <T> Promise<T> resolved(T value) { - return new PromiseImpl<T>(value, null); + return new PromiseImpl<>(value, null); } /** @@ -54,7 +57,7 @@ public class Promises { * @return A new Promise that has been resolved with the specified failure. */ public static <T> Promise<T> failed(Throwable failure) { - return new PromiseImpl<T>(null, requireNonNull(failure)); + return new PromiseImpl<>(null, requireNonNull(failure)); } /** @@ -85,13 +88,14 @@ public class Promises { */ public static <T, S extends T> Promise<List<T>> all(Collection<Promise<S>> promises) { if (promises.isEmpty()) { - List<T> result = new ArrayList<T>(); + List<T> result = new ArrayList<>(); return resolved(result); } /* make a copy and capture the ordering */ - List<Promise<? extends T>> list = new ArrayList<Promise<? extends T>>(promises); - PromiseImpl<List<T>> chained = new PromiseImpl<List<T>>(); - All<T> all = new All<T>(chained, list); + List<Promise< ? extends T>> list = new ArrayList<Promise< ? extends T>>( + promises); + PromiseImpl<List<T>> chained = new PromiseImpl<>(); + All<T> all = new All<>(chained, list); for (Promise<? extends T> promise : list) { promise.onResolve(all); } @@ -121,6 +125,7 @@ public class Promises { * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. */ + @SafeVarargs public static <T> Promise<List<T>> all(Promise<? extends T>... promises) { @SuppressWarnings("unchecked") List<Promise<T>> list = Arrays.asList((Promise<T>[]) promises); @@ -144,36 +149,30 @@ public class Promises { this.promiseCount = new AtomicInteger(promises.size()); } + @Override public void run() { if (promiseCount.decrementAndGet() != 0) { return; } - List<T> result = new ArrayList<T>(promises.size()); - List<Promise<?>> failed = new ArrayList<Promise<?>>(promises.size()); + List<T> value = new ArrayList<>(promises.size()); + List<Promise<?>> failed = new ArrayList<>(promises.size()); Throwable cause = null; for (Promise<? extends T> promise : promises) { - Throwable failure; - T value; - try { - failure = promise.getFailure(); - value = (failure != null) ? null : promise.getValue(); - } catch (Throwable e) { - chained.resolve(null, e); - return; - } - if (failure != null) { + Result<T> result = Result.collect(promise); + if (result.fail != null) { failed.add(promise); if (cause == null) { - cause = failure; + cause = result.fail; } } else { - result.add(value); + value.add(result.value); } } if (failed.isEmpty()) { - chained.resolve(result, null); + chained.tryResolve(value, null); } else { - chained.resolve(null, new FailedPromisesException(failed, cause)); + chained.tryResolve(null, + new FailedPromisesException(failed, cause)); } } } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java index c29fc4fb6..cedefa1a5 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2015). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import org.osgi.annotation.versioning.ConsumerType; * @author $Id$ */ @ConsumerType +@FunctionalInterface public interface Success<T, R> { /** * Success callback for a Promise. diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java new file mode 100644 index 000000000..09186f552 --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) OSGi Alliance (2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +/** + * Timeout exception for a Promise. + * + * @since 1.1 + * @author $Id$ + */ +public class TimeoutException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Create a new {@code TimeoutException}. + */ + public TimeoutException() { + super(); + } +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java index 5a3ec65d3..56e2d8e4b 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2014). All Rights Reserved. + * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,25 +15,23 @@ */ /** - * Promise Package Version 1.0. - * + * Promise Package Version 1.1. * <p> * Bundles wishing to use this package must list the package in the * Import-Package header of the bundle's manifest. - * * <p> * Example import for consumers using the API in this package: * <p> - * {@code Import-Package: org.osgi.util.promise; version="[1.0,2.0)"} + * {@code Import-Package: org.osgi.util.promise; version="[1.1,2.0)"} * <p> * Example import for providers implementing the API in this package: * <p> - * {@code Import-Package: org.osgi.util.promise; version="[1.0,1.1)"} + * {@code Import-Package: org.osgi.util.promise; version="[1.1,1.2)"} * * @author $Id$ */ -@Version("1.0") +@Version("1.1") package org.osgi.util.promise; import org.osgi.annotation.versioning.Version; diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo deleted file mode 100644 index 7c8de0324..000000000 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo +++ /dev/null @@ -1 +0,0 @@ -version 1.0 |