Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Watson2017-04-28 16:49:04 +0000
committerThomas Watson2017-06-16 12:38:08 +0000
commit9c940504036225a9c7d3f869f6d1ca0f55083e50 (patch)
tree0457ec5d293c13fbbaa124dcff9bc0e06240a21a
parent89617708753627e2aecdea7393d64ccf24a0b0eb (diff)
downloadrt.equinox.framework-9c940504036225a9c7d3f869f6d1ca0f55083e50.tar.gz
rt.equinox.framework-9c940504036225a9c7d3f869f6d1ca0f55083e50.tar.xz
rt.equinox.framework-9c940504036225a9c7d3f869f6d1ca0f55083e50.zip
Update promise and function API to 1.1
Change-Id: I606cabb03fb6a9ea7f1aed9757a197e3d70480d8 Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r--bundles/org.eclipse.osgi.util/.classpath2
-rw-r--r--bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs6
-rw-r--r--bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF10
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java40
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java6
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java6
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java12
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo1
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java21
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java3
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java95
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java789
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java45
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java3
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java34
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java12
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo1
17 files changed, 786 insertions, 300 deletions
diff --git a/bundles/org.eclipse.osgi.util/.classpath b/bundles/org.eclipse.osgi.util/.classpath
index 09b9f31b1..7ecc55afe 100644
--- a/bundles/org.eclipse.osgi.util/.classpath
+++ b/bundles/org.eclipse.osgi.util/.classpath
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="src" path="src">
<attributes>
diff --git a/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs b/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs
index a29e75467..99c66c2d6 100644
--- a/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs
+++ b/bundles/org.eclipse.osgi.util/.settings/org.eclipse.jdt.core.prefs
@@ -13,9 +13,9 @@ org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annota
org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable
org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.5
+org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
@@ -113,6 +113,6 @@ org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.5
+org.eclipse.jdt.core.compiler.source=1.7
org.eclipse.jdt.core.incompatibleJDKLevel=ignore
org.eclipse.jdt.core.incompleteClasspath=error
diff --git a/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF b/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF
index 7e7bf9de7..19894f6df 100644
--- a/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF
+++ b/bundles/org.eclipse.osgi.util/META-INF/MANIFEST.MF
@@ -8,16 +8,16 @@ Bundle-Vendor: %eclipse.org
Bundle-Localization: plugin
Bundle-DocUrl: http://www.eclipse.org
Bundle-ContactAddress: www.eclipse.org
-Export-Package: org.osgi.util.function;version="1.0",
+Export-Package: org.osgi.util.function;version="1.1",
org.osgi.util.measurement;version="1.0.1",
org.osgi.util.position;version="1.0.1";uses:="org.osgi.util.measurement",
- org.osgi.util.promise;version="1.0";uses:="org.osgi.util.function",
+ org.osgi.util.promise;version="1.1";uses:="org.osgi.util.function",
org.osgi.util.xml;version="1.0.1";uses:="org.osgi.framework,javax.xml.parsers"
Import-Package: org.osgi.framework; version=1.1,
javax.xml.parsers,
- org.osgi.util.function; version="[1.0, 1.1)",
+ org.osgi.util.function; version="[1.1, 1.2)",
org.osgi.util.measurement; version="[1.0.1, 1.1)",
org.osgi.util.position; version="[1.0.1, 1.1)",
- org.osgi.util.promise; version="[1.0, 1.1)",
+ org.osgi.util.promise; version="[1.1, 1.2)",
org.osgi.util.xml; version="[1.0.1, 1.1)"
-Bundle-RequiredExecutionEnvironment: J2SE-1.5
+Bundle-RequiredExecutionEnvironment: JavaSE-1.7
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java
new file mode 100644
index 000000000..17ff376bc
--- /dev/null
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Callback.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.function;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * A callback that performs an operation and may throw an exception.
+ * <p>
+ * This is a functional interface and can be used as the assignment target for a
+ * lambda expression or method reference.
+ *
+ * @ThreadSafe
+ * @since 1.1
+ * @author $Id$
+ */
+@ConsumerType
+@FunctionalInterface
+public interface Callback {
+ /**
+ * Execute the callback.
+ *
+ * @throws Exception An exception thrown by the method.
+ */
+ void run() throws Exception;
+}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java
index 5d812f75c..3d17c97c7 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Function.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,12 +32,14 @@ import org.osgi.annotation.versioning.ConsumerType;
* @author $Id$
*/
@ConsumerType
+@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the specified argument.
*
* @param t The input to this function.
* @return The output of this function.
+ * @throws Exception An exception thrown by the method.
*/
- R apply(T t);
+ R apply(T t) throws Exception;
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java
index 0c2c61f78..681b771c2 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/Predicate.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@ import org.osgi.annotation.versioning.ConsumerType;
* @author $Id$
*/
@ConsumerType
+@FunctionalInterface
public interface Predicate<T> {
/**
* Evaluates this predicate on the specified argument.
@@ -38,6 +39,7 @@ public interface Predicate<T> {
* @param t The input to this predicate.
* @return {@code true} if the specified argument is accepted by this
* predicate; {@code false} otherwise.
+ * @throws Exception An exception thrown by the method.
*/
- boolean test(T t);
+ boolean test(T t) throws Exception;
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java
index 899d786b2..82ed82dc4 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/package-info.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,25 +15,23 @@
*/
/**
- * Function Package Version 1.0.
- *
+ * Function Package Version 1.1.
* <p>
* Bundles wishing to use this package must list the package in the
* Import-Package header of the bundle's manifest.
- *
* <p>
* Example import for consumers using the API in this package:
* <p>
- * {@code Import-Package: org.osgi.util.function; version="[1.0,2.0)"}
+ * {@code Import-Package: org.osgi.util.function; version="[1.1,2.0)"}
* <p>
* Example import for providers implementing the API in this package:
* <p>
- * {@code Import-Package: org.osgi.util.function; version="[1.0,1.1)"}
+ * {@code Import-Package: org.osgi.util.function; version="[1.1,1.2)"}
*
* @author $Id$
*/
-@Version("1.0")
+@Version("1.1")
package org.osgi.util.function;
import org.osgi.annotation.versioning.Version;
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo b/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo
deleted file mode 100644
index 7c8de0324..000000000
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/function/packageinfo
+++ /dev/null
@@ -1 +0,0 @@
-version 1.0
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java
index e9ff0a6fa..004925fa1 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ public class Deferred<T> {
* Create a new Deferred with an associated Promise.
*/
public Deferred() {
- promise = new PromiseImpl<T>();
+ promise = new PromiseImpl<>();
}
/**
@@ -64,7 +64,7 @@ public class Deferred<T> {
* After the associated Promise is resolved with the specified value, all
* registered {@link Promise#onResolve(Runnable) callbacks} are called and
* any {@link Promise#then(Success, Failure) chained} Promises are resolved.
- *
+ * This may occur asynchronously to this method.
* <p>
* Resolving the associated Promise <i>happens-before</i> any registered
* callback is called. That is, in a registered callback,
@@ -87,7 +87,7 @@ public class Deferred<T> {
* After the associated Promise is resolved with the specified failure, all
* registered {@link Promise#onResolve(Runnable) callbacks} are called and
* any {@link Promise#then(Success, Failure) chained} Promises are resolved.
- *
+ * This may occur asynchronously to this method.
* <p>
* Resolving the associated Promise <i>happens-before</i> any registered
* callback is called. That is, in a registered callback,
@@ -118,7 +118,7 @@ public class Deferred<T> {
* After the associated Promise is resolved with the specified Promise, all
* registered {@link Promise#onResolve(Runnable) callbacks} are called and
* any {@link Promise#then(Success, Failure) chained} Promises are resolved.
- *
+ * This may occur asynchronously to this method.
* <p>
* Resolving the associated Promise <i>happens-before</i> any registered
* callback is called. That is, in a registered callback,
@@ -139,4 +139,15 @@ public class Deferred<T> {
public Promise<Void> resolveWith(Promise<? extends T> with) {
return promise.resolveWith(with);
}
+
+ /**
+ * Returns a string representation of the associated Promise.
+ *
+ * @return A string representation of the associated Promise.
+ * @since 1.1
+ */
+ @Override
+ public String toString() {
+ return promise.toString();
+ }
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java
index 4e6a17884..9d491b595 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Failure.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2015). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@ import org.osgi.annotation.versioning.ConsumerType;
* @author $Id$
*/
@ConsumerType
+@FunctionalInterface
public interface Failure {
/**
* Failure callback for a Promise.
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java
index ffa193393..f9e4ef5d2 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promise.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,13 +17,14 @@
package org.osgi.util.promise;
import java.lang.reflect.InvocationTargetException;
+
import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.function.Callback;
import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
/**
* A Promise of a value.
- *
* <p>
* A Promise represents a future value. It handles the interactions for
* asynchronous processing. A {@link Deferred} object can be used to create a
@@ -33,7 +34,6 @@ import org.osgi.util.function.Predicate;
* or the Promise can be used in chaining. In chaining, callbacks are provided
* that receive the resolved Promise, and a new Promise is generated that
* resolves based upon the result of a callback.
- *
* <p>
* Both {@link #onResolve(Runnable) callbacks} and
* {@link #then(Success, Failure) chaining} can be repeated any number of times,
@@ -42,32 +42,19 @@ import org.osgi.util.function.Predicate;
* Example callback usage:
*
* <pre>
- * final Promise&lt;String&gt; foo = foo();
- * foo.onResolve(new Runnable() {
- * public void run() {
- * System.out.println(foo.getValue());
- * }
- * });
+ * Promise&lt;String&gt; foo = foo();
+ * foo.onResolve(() -&gt; System.out.println("resolved"));
* </pre>
*
* Example chaining usage;
*
* <pre>
- * Success&lt;String,String&gt; doubler = new Success&lt;String,String&gt;() {
- * public Promise&lt;String&gt; call(Promise&lt;String&gt; p) throws Exception {
- * return Promises.resolved(p.getValue()+p.getValue());
- * }
- * };
- * final Promise&lt;String&gt; foo = foo().then(doubler).then(doubler);
- * foo.onResolve(new Runnable() {
- * public void run() {
- * System.out.println(foo.getValue());
- * }
- * });
+ * Success&lt;String,String&gt; doubler = p -&gt; Promises
+ * .resolved(p.getValue() + p.getValue());
+ * Promise&lt;String&gt; foo = foo().then(doubler).then(doubler);
* </pre>
*
* @param <T> The value type associated with this Promise.
- *
* @ThreadSafe
* @author $Id$
*/
@@ -231,6 +218,40 @@ public interface Promise<T> {
<R> Promise<R> then(Success<? super T, ? extends R> success);
/**
+ * Chain a new Promise to this Promise with a callback.
+ * <p>
+ * The specified {@link Callback} is called when this Promise is resolved
+ * either successfully or with a failure.
+ * <p>
+ * This method returns a new Promise which is chained to this Promise. The
+ * returned Promise must be resolved when this Promise is resolved after the
+ * specified callback is executed. If the callback throws an exception, the
+ * returned Promise is failed with that exception. Otherwise the returned
+ * Promise is resolved with this Promise.
+ * <p>
+ * This method may be called at any time including before and after this
+ * Promise has been resolved.
+ * <p>
+ * Resolving this Promise <i>happens-before</i> any registered callback is
+ * called. That is, in a registered callback, {@link #isDone()} must return
+ * {@code true} and {@link #getValue()} and {@link #getFailure()} must not
+ * block.
+ * <p>
+ * A callback may be called on a different thread than the thread which
+ * registered the callback. So the callback must be thread safe but can rely
+ * upon that the registration of the callback <i>happens-before</i> the
+ * registered callback is called.
+ *
+ * @param callback A callback to be called when this Promise is resolved.
+ * Must not be {@code null}.
+ * @return A new Promise which is chained to this Promise. The returned
+ * Promise must be resolved when this Promise is resolved after the
+ * specified callback is executed.
+ * @since 1.1
+ */
+ Promise<T> then(Callback callback);
+
+ /**
* Filter the value of this Promise.
*
* <p>
@@ -400,4 +421,36 @@ public interface Promise<T> {
* the value of the specified Promise.
*/
Promise<T> fallbackTo(Promise<? extends T> fallback);
+
+ /**
+ * Time out the resolution of this Promise.
+ * <p>
+ * If this Promise is successfully resolved before the timeout, the returned
+ * Promise is resolved with the value of this Promise. If this Promise is
+ * resolved with a failure before the timeout, the returned Promise is
+ * resolved with the failure of this Promise. If the timeout is reached
+ * before this Promise is resolved, the returned Promise is failed with a
+ * {@link TimeoutException}.
+ *
+ * @param milliseconds The time to wait in milliseconds. Zero and negative
+ * time is treated as an immediate timeout.
+ * @return A Promise that is resolved when either this Promise is resolved
+ * or the specified timeout is reached.
+ * @since 1.1
+ */
+ Promise<T> timeout(long milliseconds);
+
+ /**
+ * Delay after the resolution of this Promise.
+ * <p>
+ * Once this Promise is resolved, resolve the returned Promise with this
+ * Promise after the specified delay.
+ *
+ * @param milliseconds The time to delay in milliseconds. Zero and negative
+ * time is treated as no delay.
+ * @return A Promise that is resolved with this Promise after this Promise
+ * is resolved and the specified delay has elapsed.
+ * @since 1.1
+ */
+ Promise<T> delay(long milliseconds);
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
index ca2be5e80..2afb3771f 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,8 +18,25 @@ package org.osgi.util.promise;
import java.lang.reflect.InvocationTargetException;
import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.osgi.util.function.Callback;
import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
@@ -44,12 +61,10 @@ final class PromiseImpl<T> implements Promise<T> {
private final ConcurrentLinkedQueue<Runnable> callbacks;
/**
* A CountDownLatch to manage the resolved state of this Promise.
- *
* <p>
* This object is used as the synchronizing object to provide a critical
- * section in {@link #resolve(Object, Throwable)} so that only a single
+ * section in {@link #tryResolve(Object, Throwable)} so that only a single
* thread can write the resolved state variables and open the latch.
- *
* <p>
* The resolved state variables, {@link #value} and {@link #fail}, must only
* be written when the latch is closed (getCount() != 0) and must only be
@@ -78,7 +93,7 @@ final class PromiseImpl<T> implements Promise<T> {
* Initialize this Promise.
*/
PromiseImpl() {
- callbacks = new ConcurrentLinkedQueue<Runnable>();
+ callbacks = new ConcurrentLinkedQueue<>();
resolved = new CountDownLatch(1);
}
@@ -89,34 +104,63 @@ final class PromiseImpl<T> implements Promise<T> {
* @param f The failure of this resolved Promise.
*/
PromiseImpl(T v, Throwable f) {
- value = v;
- fail = f;
- callbacks = new ConcurrentLinkedQueue<Runnable>();
+ if (f == null) {
+ value = v;
+ } else {
+ fail = f;
+ }
+ callbacks = new ConcurrentLinkedQueue<>();
resolved = new CountDownLatch(0);
}
/**
- * Resolve this Promise.
+ * Try to resolve this Promise.
+ * <p>
+ * If this Promise was already resolved, return false. Otherwise, resolve
+ * this Promise and return true.
*
* @param v The value of this Promise.
* @param f The failure of this Promise.
+ * @return false if this Promise was already resolved; true if this method
+ * resolved this Promise.
+ * @since 1.1
*/
- void resolve(T v, Throwable f) {
+ boolean tryResolve(T v, Throwable f) {
// critical section: only one resolver at a time
synchronized (resolved) {
if (resolved.getCount() == 0) {
- throw new IllegalStateException("Already resolved");
+ return false;
}
/*
* The resolved state variables must be set before opening the
* latch. This safely publishes them to be read by other threads
* that must verify the latch is open before reading.
*/
- value = v;
- fail = f;
+ if (f == null) {
+ value = v;
+ } else {
+ fail = f;
+ }
resolved.countDown();
}
notifyCallbacks(); // call any registered callbacks
+ return true;
+ }
+
+ /**
+ * Resolve this Promise.
+ * <p>
+ * If this Promise was already resolved, throw IllegalStateException.
+ * Otherwise, resolve this Promise.
+ *
+ * @param v The value of this Promise.
+ * @param f The failure of this Promise.
+ * @throws IllegalStateException If this Promise was already resolved.
+ */
+ void resolve(T v, Throwable f) {
+ if (!tryResolve(v, f)) {
+ throw new IllegalStateException("Already resolved");
+ }
}
/**
@@ -129,21 +173,18 @@ final class PromiseImpl<T> implements Promise<T> {
/*
* Note: multiple threads can be in this method removing callbacks from
- * the queue and calling them, so the order in which callbacks are
- * called cannot be specified.
+ * the queue and executing them, so the order in which callbacks are
+ * executed cannot be specified.
*/
for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
- try {
- callback.run();
- } catch (Throwable t) {
- Logger.logCallbackException(t);
- }
+ Callbacks.execute(callback);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public boolean isDone() {
return resolved.getCount() == 0;
}
@@ -151,6 +192,7 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* {@inheritDoc}
*/
+ @Override
public T getValue() throws InvocationTargetException, InterruptedException {
resolved.await();
if (fail == null) {
@@ -162,14 +204,40 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* {@inheritDoc}
*/
+ @Override
public Throwable getFailure() throws InterruptedException {
resolved.await();
return fail;
}
/**
+ * @since 1.1
+ */
+ @Override
+ public String toString() {
+ if (!isDone()) {
+ return super.toString() + "[unresolved]";
+ }
+ final boolean interrupted = Thread.interrupted();
+ try {
+ Throwable t = getFailure();
+ if (t != null) {
+ return super.toString() + "[failed: " + t + "]";
+ }
+ return super.toString() + "[resolved: " + getValue() + "]";
+ } catch (InterruptedException | InvocationTargetException e) {
+ return super.toString() + "[" + e + "]";
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
+ @Override
public Promise<T> onResolve(Runnable callback) {
callbacks.offer(callback);
notifyCallbacks(); // call any registered callbacks
@@ -179,15 +247,17 @@ final class PromiseImpl<T> implements Promise<T> {
/**
* {@inheritDoc}
*/
+ @Override
public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
- PromiseImpl<R> chained = new PromiseImpl<R>();
- onResolve(new Then<R>(chained, success, failure));
+ PromiseImpl<R> chained = new PromiseImpl<>();
+ onResolve(chained.new Then<>(this, success, failure));
return chained;
}
/**
* {@inheritDoc}
*/
+ @Override
public <R> Promise<R> then(Success<? super T, ? extends R> success) {
return then(success, null);
}
@@ -198,58 +268,44 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private final class Then<R> implements Runnable {
- private final PromiseImpl<R> chained;
- private final Success<T, ? extends R> success;
+ private final class Then<P> implements Runnable {
+ private final Promise<P> promise;
+ private final Success<P, ? extends T> success;
private final Failure failure;
@SuppressWarnings("unchecked")
- Then(PromiseImpl<R> chained, Success<? super T, ? extends R> success, Failure failure) {
- this.chained = chained;
- this.success = (Success<T, ? extends R>) success;
+ Then(Promise<P> promise, Success< ? super P, ? extends T> success,
+ Failure failure) {
+ this.promise = promise;
+ this.success = (Success<P, ? extends T>) success;
this.failure = failure;
}
+ @Override
public void run() {
- Throwable f;
- final boolean interrupted = Thread.interrupted();
- try {
- f = getFailure();
- } catch (Throwable e) {
- f = e; // propagate new exception
- } finally {
- if (interrupted) { // restore interrupt status
- Thread.currentThread().interrupt();
- }
- }
+ Throwable f = Result.collect(promise).fail;
if (f != null) {
if (failure != null) {
try {
- failure.fail(PromiseImpl.this);
+ failure.fail(promise);
} catch (Throwable e) {
f = e; // propagate new exception
}
}
- // fail chained
- chained.resolve(null, f);
- return;
- }
- Promise<? extends R> returned = null;
- if (success != null) {
+ } else if (success != null) {
+ Promise< ? extends T> returned = null;
try {
- returned = success.call(PromiseImpl.this);
+ returned = success.call(promise);
} catch (Throwable e) {
- chained.resolve(null, e);
+ f = e; // propagate new exception
+ }
+ if (returned != null) {
+ // resolve chained when returned promise is resolved
+ returned.onResolve(new Chain(returned));
return;
}
}
- if (returned == null) {
- // resolve chained with null value
- chained.resolve(null, null);
- } else {
- // resolve chained when returned promise is resolved
- returned.onResolve(new Chain<R>(chained, returned));
- }
+ tryResolve(null, f);
}
}
@@ -259,46 +315,58 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private final static class Chain<R> implements Runnable {
- private final PromiseImpl<R> chained;
- private final Promise<? extends R> promise;
+ private final class Chain implements Runnable {
+ private final Promise< ? extends T> promise;
private final Throwable failure;
+ private final Callback callback;
- Chain(PromiseImpl<R> chained, Promise<? extends R> promise) {
- this.chained = chained;
+ Chain(Promise< ? extends T> promise) {
this.promise = promise;
this.failure = null;
+ this.callback = null;
}
- Chain(PromiseImpl<R> chained, Promise<? extends R> promise, Throwable failure) {
- this.chained = chained;
+ Chain(Promise< ? extends T> promise, Throwable failure) {
this.promise = promise;
- this.failure = failure;
+ this.failure = requireNonNull(failure);
+ this.callback = null;
+ }
+
+ Chain(Promise< ? extends T> promise, Callback callback) {
+ this.promise = promise;
+ this.failure = null;
+ this.callback = requireNonNull(callback);
}
+ @Override
public void run() {
- R value = null;
- Throwable f;
- final boolean interrupted = Thread.interrupted();
- try {
- f = promise.getFailure();
- if (f == null) {
- value = promise.getValue();
- } else if (failure != null) {
- f = failure;
- }
- } catch (Throwable e) {
- f = e; // propagate new exception
- } finally {
- if (interrupted) { // restore interrupt status
- Thread.currentThread().interrupt();
+ if (callback != null) {
+ try {
+ callback.run();
+ } catch (Throwable e) {
+ tryResolve(null, e);
+ return;
}
}
- chained.resolve(value, f);
+ Result<T> result = Result.collect(promise);
+ if ((result.fail != null) && (failure != null)) {
+ result.fail = failure;
+ }
+ tryResolve(result.value, result.fail);
}
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public Promise<T> then(Callback callback) {
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ onResolve(chained.new Chain(this, callback));
+ return chained;
+ }
+
+ /**
* Resolve this Promise with the specified Promise.
*
* <p>
@@ -318,9 +386,8 @@ final class PromiseImpl<T> implements Promise<T> {
* resolved.
*/
Promise<Void> resolveWith(Promise<? extends T> with) {
- PromiseImpl<Void> chained = new PromiseImpl<Void>();
- ResolveWith resolveWith = new ResolveWith(chained);
- with.then(resolveWith, resolveWith);
+ PromiseImpl<Void> chained = new PromiseImpl<>();
+ with.onResolve(new ResolveWith(with, chained));
return chained;
}
@@ -330,40 +397,36 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private final class ResolveWith implements Success<T, Void>, Failure {
+ private final class ResolveWith implements Runnable {
+ private final Promise< ? extends T> promise;
private final PromiseImpl<Void> chained;
- ResolveWith(PromiseImpl<Void> chained) {
+ ResolveWith(Promise< ? extends T> promise, PromiseImpl<Void> chained) {
+ this.promise = promise;
this.chained = chained;
}
- public Promise<Void> call(Promise<T> with) throws Exception {
- try {
- resolve(with.getValue(), null);
- } catch (Throwable e) {
- chained.resolve(null, e);
- return null;
- }
- chained.resolve(null, null);
- return null;
- }
-
- public void fail(Promise<?> with) throws Exception {
+ @Override
+ public void run() {
+ Throwable f = null;
+ Result<T> result = Result.collect(promise);
try {
- resolve(null, with.getFailure());
+ resolve(result.value, result.fail);
} catch (Throwable e) {
- chained.resolve(null, e);
- return;
+ f = e; // propagate new exception
}
- chained.resolve(null, null);
+ chained.tryResolve(null, f);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public Promise<T> filter(Predicate<? super T> predicate) {
- return then(new Filter<T>(predicate));
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ onResolve(chained.new Filter(this, predicate));
+ return chained;
}
/**
@@ -371,26 +434,39 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private static final class Filter<T> implements Success<T, T> {
+ private final class Filter implements Runnable {
+ private final Promise< ? extends T> promise;
private final Predicate<? super T> predicate;
- Filter(Predicate<? super T> predicate) {
+ Filter(Promise< ? extends T> promise, Predicate< ? super T> predicate) {
+ this.promise = promise;
this.predicate = requireNonNull(predicate);
}
- public Promise<T> call(Promise<T> resolved) throws Exception {
- if (predicate.test(resolved.getValue())) {
- return resolved;
+ @Override
+ public void run() {
+ Result<T> result = Result.collect(promise);
+ if (result.fail == null) {
+ try {
+ if (!predicate.test(result.value)) {
+ result.fail = new NoSuchElementException();
+ }
+ } catch (Throwable e) { // propagate new exception
+ result.fail = e;
+ }
}
- throw new NoSuchElementException();
+ tryResolve(result.value, result.fail);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
- return then(new Map<T, R>(mapper));
+ PromiseImpl<R> chained = new PromiseImpl<>();
+ onResolve(chained.new Map<>(this, mapper));
+ return chained;
}
/**
@@ -398,23 +474,39 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private static final class Map<T, R> implements Success<T, R> {
- private final Function<? super T, ? extends R> mapper;
+ private final class Map<P> implements Runnable {
+ private final Promise< ? extends P> promise;
+ private final Function<? super P, ? extends T> mapper;
- Map(Function<? super T, ? extends R> mapper) {
+ Map(Promise< ? extends P> promise,
+ Function< ? super P, ? extends T> mapper) {
+ this.promise = promise;
this.mapper = requireNonNull(mapper);
}
- public Promise<R> call(Promise<T> resolved) throws Exception {
- return new PromiseImpl<R>(mapper.apply(resolved.getValue()), null);
+ @Override
+ public void run() {
+ Result<P> result = Result.collect(promise);
+ T v = null;
+ if (result.fail == null) {
+ try {
+ v = mapper.apply(result.value);
+ } catch (Throwable e) { // propagate new exception
+ result.fail = e;
+ }
+ }
+ tryResolve(v, result.fail);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
- return then(new FlatMap<T, R>(mapper));
+ PromiseImpl<R> chained = new PromiseImpl<>();
+ onResolve(chained.new FlatMap<>(this, mapper));
+ return chained;
}
/**
@@ -422,26 +514,42 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private static final class FlatMap<T, R> implements Success<T, R> {
- private final Function<? super T, Promise<? extends R>> mapper;
+ private final class FlatMap<P> implements Runnable {
+ private final Promise< ? extends P> promise;
+ private final Function< ? super P,Promise< ? extends T>> mapper;
- FlatMap(Function<? super T, Promise<? extends R>> mapper) {
+ FlatMap(Promise< ? extends P> promise,
+ Function< ? super P,Promise< ? extends T>> mapper) {
+ this.promise = promise;
this.mapper = requireNonNull(mapper);
}
- @SuppressWarnings("unchecked")
- public Promise<R> call(Promise<T> resolved) throws Exception {
- return (Promise<R>) mapper.apply(resolved.getValue());
+ @Override
+ public void run() {
+ Result<P> result = Result.collect(promise);
+ if (result.fail == null) {
+ Promise< ? extends T> flatmap = null;
+ try {
+ flatmap = mapper.apply(result.value);
+ } catch (Throwable e) { // propagate new exception
+ result.fail = e;
+ }
+ if (flatmap != null) {
+ flatmap.onResolve(new Chain(flatmap));
+ return;
+ }
+ }
+ tryResolve(null, result.fail);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<T>();
- Recover<T> recover = new Recover<T>(chained, recovery);
- then(recover, recover);
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ onResolve(chained.new Recover(this, recovery));
return chained;
}
@@ -450,52 +558,41 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private static final class Recover<T> implements Success<T, Void>, Failure {
- private final PromiseImpl<T> chained;
- private final Function<Promise<?>, ? extends T> recovery;
+ private final class Recover implements Runnable {
+ private final Promise<T> promise;
+ private final Function<Promise< ? >, ? extends T> recovery;
- Recover(PromiseImpl<T> chained, Function<Promise<?>, ? extends T> recovery) {
- this.chained = chained;
+ Recover(Promise<T> promise,
+ Function<Promise< ? >, ? extends T> recovery) {
+ this.promise = promise;
this.recovery = requireNonNull(recovery);
}
- public Promise<Void> call(Promise<T> resolved) throws Exception {
- T value;
- try {
- value = resolved.getValue();
- } catch (Throwable e) {
- chained.resolve(null, e);
- return null;
- }
- chained.resolve(value, null);
- return null;
- }
-
- public void fail(Promise<?> resolved) throws Exception {
- T recovered;
- Throwable failure;
- try {
- recovered = recovery.apply(resolved);
- failure = resolved.getFailure();
- } catch (Throwable e) {
- chained.resolve(null, e);
- return;
- }
- if (recovered == null) {
- chained.resolve(null, failure);
- } else {
- chained.resolve(recovered, null);
+ @Override
+ public void run() {
+ Result<T> result = Result.collect(promise);
+ if (result.fail != null) {
+ try {
+ T v = recovery.apply(promise);
+ if (v != null) {
+ result.value = v;
+ result.fail = null;
+ }
+ } catch (Throwable e) { // propagate new exception
+ result.fail = e;
+ }
}
+ tryResolve(result.value, result.fail);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
- PromiseImpl<T> chained = new PromiseImpl<T>();
- RecoverWith<T> recoverWith = new RecoverWith<T>(chained, recovery);
- then(recoverWith, recoverWith);
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ onResolve(chained.new RecoverWith(this, recovery));
return chained;
}
@@ -504,112 +601,364 @@ final class PromiseImpl<T> implements Promise<T> {
*
* @Immutable
*/
- private static final class RecoverWith<T> implements Success<T, Void>, Failure {
- private final PromiseImpl<T> chained;
+ private final class RecoverWith implements Runnable {
+ private final Promise<T> promise;
private final Function<Promise<?>, Promise<? extends T>> recovery;
- RecoverWith(PromiseImpl<T> chained, Function<Promise<?>, Promise<? extends T>> recovery) {
- this.chained = chained;
+ RecoverWith(Promise<T> promise,
+ Function<Promise< ? >,Promise< ? extends T>> recovery) {
+ this.promise = promise;
this.recovery = requireNonNull(recovery);
}
- public Promise<Void> call(Promise<T> resolved) throws Exception {
- T value;
- try {
- value = resolved.getValue();
- } catch (Throwable e) {
- chained.resolve(null, e);
- return null;
+ @Override
+ public void run() {
+ Result<T> result = Result.collect(promise);
+ if (result.fail != null) {
+ Promise< ? extends T> recovered = null;
+ try {
+ recovered = recovery.apply(promise);
+ } catch (Throwable e) { // propagate new exception
+ result.fail = e;
+ }
+ if (recovered != null) {
+ recovered.onResolve(new Chain(recovered));
+ return;
+ }
}
- chained.resolve(value, null);
- return null;
+ tryResolve(result.value, result.fail);
}
+ }
- public void fail(Promise<?> resolved) throws Exception {
- Promise<? extends T> recovered;
- Throwable failure;
- try {
- recovered = recovery.apply(resolved);
- failure = resolved.getFailure();
- } catch (Throwable e) {
- chained.resolve(null, e);
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Promise<T> fallbackTo(Promise<? extends T> fallback) {
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ onResolve(chained.new FallbackTo(this, fallback));
+ return chained;
+ }
+
+ /**
+ * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method.
+ *
+ * @Immutable
+ */
+ private final class FallbackTo implements Runnable {
+ private final Promise<T> promise;
+ private final Promise<? extends T> fallback;
+
+ FallbackTo(Promise<T> promise, Promise< ? extends T> fallback) {
+ this.promise = promise;
+ this.fallback = requireNonNull(fallback);
+ }
+
+ @Override
+ public void run() {
+ Result<T> result = Result.collect(promise);
+ if (result.fail != null) {
+ fallback.onResolve(new Chain(fallback, result.fail));
return;
}
- if (recovered == null) {
- chained.resolve(null, failure);
- } else {
- recovered.onResolve(new Chain<T>(chained, recovered));
+ tryResolve(result.value, result.fail);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @since 1.1
+ */
+ @Override
+ public Promise<T> timeout(long millis) {
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ if (!isDone()) {
+ onResolve(chained.new Timeout(millis, TimeUnit.MILLISECONDS));
+ }
+ onResolve(chained.new Chain(this));
+ return chained;
+ }
+
+ /**
+ * Timeout class used by the {@link PromiseImpl#timeout(long)} method to
+ * cancel timeout when the Promise is resolved.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private final class Timeout implements Runnable {
+ private final ScheduledFuture< ? > future;
+
+ Timeout(long timeout, TimeUnit unit) {
+ future = Callbacks.schedule(new TimeoutAction(), timeout, unit);
+ }
+
+ @Override
+ public void run() {
+ if (future != null) {
+ future.cancel(false);
}
}
}
+
+ /**
+ * Callback used to fail the Promise if the timeout expires.
+ *
+ * @Immutable
+ */
+ private final class TimeoutAction implements Runnable {
+ TimeoutAction() {}
+
+ @Override
+ public void run() {
+ tryResolve(null, new TimeoutException());
+ }
+ }
/**
* {@inheritDoc}
+ *
+ * @since 1.1
*/
- public Promise<T> fallbackTo(Promise<? extends T> fallback) {
- PromiseImpl<T> chained = new PromiseImpl<T>();
- FallbackTo<T> fallbackTo = new FallbackTo<T>(chained, fallback);
- then(fallbackTo, fallbackTo);
+ @Override
+ public Promise<T> delay(long millis) {
+ PromiseImpl<T> chained = new PromiseImpl<>();
+ onResolve(new Delay(chained.new Chain(this), millis,
+ TimeUnit.MILLISECONDS));
return chained;
}
/**
- * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method.
+ * Delay class used by the {@link PromiseImpl#delay(long)} method to delay
+ * chaining a promise.
*
* @Immutable
+ * @since 1.1
*/
- private static final class FallbackTo<T> implements Success<T, Void>, Failure {
- private final PromiseImpl<T> chained;
- private final Promise<? extends T> fallback;
+ private static final class Delay implements Runnable {
+ private final Runnable callback;
+ private final long delay;
+ private final TimeUnit unit;
+
+ Delay(Runnable callback, long delay, TimeUnit unit) {
+ this.callback = callback;
+ this.delay = delay;
+ this.unit = unit;
+ }
- FallbackTo(PromiseImpl<T> chained, Promise<? extends T> fallback) {
- this.chained = chained;
- this.fallback = requireNonNull(fallback);
+ @Override
+ public void run() {
+ Callbacks.schedule(callback, delay, unit);
}
+ }
- public Promise<Void> call(Promise<T> resolved) throws Exception {
- T value;
+ /**
+ * Callback handler used to asynchronously execute callbacks.
+ *
+ * @Immutable
+ * @since 1.1
+ */
+ private static final class Callbacks
+ implements ThreadFactory, RejectedExecutionHandler, Runnable {
+ private static final Callbacks callbacks;
+ private static final ScheduledExecutor scheduledExecutor;
+ private static final ThreadPoolExecutor callbackExecutor;
+ static {
+ callbacks = new Callbacks();
+ scheduledExecutor = new ScheduledExecutor(2, callbacks);
+ callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ callbacks, callbacks);
+ }
+
+ /**
+ * Schedule a callback on the scheduled executor
+ */
+ static ScheduledFuture< ? > schedule(Runnable callback, long delay,
+ TimeUnit unit) {
try {
- value = resolved.getValue();
- } catch (Throwable e) {
- chained.resolve(null, e);
+ return scheduledExecutor.schedule(callback, delay, unit);
+ } catch (RejectedExecutionException e) {
+ callbacks.rejectedExecution(callback, scheduledExecutor);
return null;
}
- chained.resolve(value, null);
- return null;
}
- public void fail(Promise<?> resolved) throws Exception {
- Throwable failure;
+ /**
+ * Execute a callback on the callback executor
+ */
+ static void execute(Runnable callback) {
+ callbackExecutor.execute(callback);
+ }
+
+ static void uncaughtException(Throwable t) {
try {
- failure = resolved.getFailure();
- } catch (Throwable e) {
- chained.resolve(null, e);
- return;
+ Thread thread = Thread.currentThread();
+ thread.getUncaughtExceptionHandler().uncaughtException(thread,
+ t);
+ } catch (Throwable ignored) {
+ // we ignore this
}
- fallback.onResolve(new Chain<T>(chained, fallback, failure));
}
- }
- static <V> V requireNonNull(V value) {
- if (value != null) {
- return value;
+ private final AtomicBoolean shutdownHookInstalled;
+ private final ThreadFactory delegateThreadFactory;
+
+ private Callbacks() {
+ shutdownHookInstalled = new AtomicBoolean();
+ delegateThreadFactory = Executors.defaultThreadFactory();
+ }
+
+ /**
+ * Executor threads should not prevent VM from exiting
+ */
+ @Override
+ public Thread newThread(Runnable r) {
+ if (shutdownHookInstalled.compareAndSet(false, true)) {
+ Thread shutdownThread = delegateThreadFactory.newThread(this);
+ shutdownThread.setName("ExecutorShutdownHook,"
+ + shutdownThread.getName());
+ try {
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+ } catch (IllegalStateException e) {
+ // VM is already shutting down...
+ callbackExecutor.shutdown();
+ scheduledExecutor.shutdown();
+ }
+ }
+ Thread t = delegateThreadFactory.newThread(r);
+ t.setName("PromiseImpl," + t.getName());
+ t.setDaemon(true);
+ return t;
+ }
+
+ /**
+ * Call the callback using the caller's thread because the thread pool
+ * rejected the execution.
+ */
+ @Override
+ public void rejectedExecution(Runnable callback,
+ ThreadPoolExecutor executor) {
+ try {
+ callback.run();
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ }
+
+ /**
+ * Shutdown hook
+ */
+ @Override
+ public void run() {
+ // limit new thread creation
+ callbackExecutor.setMaximumPoolSize(
+ Math.max(1, callbackExecutor.getPoolSize()));
+ // Run all delayed callbacks now
+ scheduledExecutor.shutdown();
+ BlockingQueue<Runnable> queue = scheduledExecutor.getQueue();
+ if (!queue.isEmpty()) {
+ for (Object r : queue.toArray()) {
+ if (r instanceof RunnableScheduledFuture< ? >) {
+ RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r;
+ if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L)
+ && queue.remove(future)) {
+ future.run();
+ scheduledExecutor.afterExecute(future, null);
+ }
+ }
+ }
+ scheduledExecutor.shutdown();
+ }
+ try {
+ scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Shutdown callback executor
+ callbackExecutor.shutdown();
+ try {
+ callbackExecutor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * ScheduledThreadPoolExecutor for scheduled execution.
+ *
+ * @ThreadSafe
+ */
+ private static final class ScheduledExecutor
+ extends ScheduledThreadPoolExecutor {
+ ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
+ super(corePoolSize, threadFactory);
+ }
+
+ /**
+ * Handle uncaught exceptions
+ */
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if ((t == null) && (r instanceof Future< ? >)) {
+ boolean interrupted = Thread.interrupted();
+ try {
+ ((Future< ? >) r).get();
+ } catch (CancellationException e) {
+ // ignore
+ } catch (InterruptedException e) {
+ interrupted = true;
+ } catch (ExecutionException e) {
+ t = e.getCause();
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (t != null) {
+ uncaughtException(t);
+ }
+ }
}
- throw new NullPointerException();
}
/**
- * Use the lazy initialization holder class idiom to delay creating a Logger
- * until we actually need it.
+ * A holder of the result of a promise.
+ *
+ * @NotThreadSafe
*/
- private static final class Logger {
- private final static java.util.logging.Logger LOGGER;
- static {
- LOGGER = java.util.logging.Logger.getLogger(PromiseImpl.class.getName());
+ static final class Result<P> {
+ Throwable fail;
+ P value;
+
+ Result() {}
+
+ static <R> Result<R> collect(Promise< ? extends R> promise) {
+ Result<R> result = new Result<>();
+ final boolean interrupted = Thread.interrupted();
+ try {
+ result.fail = promise.getFailure();
+ if (result.fail == null) {
+ result.value = promise.getValue();
+ }
+ } catch (Throwable e) {
+ result.fail = e; // propagate new exception
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ return result;
}
+ }
- static void logCallbackException(Throwable t) {
- LOGGER.log(java.util.logging.Level.WARNING, "Exception from Promise callback", t);
+ static <V> V requireNonNull(V value) {
+ if (value != null) {
+ return value;
}
+ throw new NullPointerException();
}
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java
index 06d506675..7067fe50a 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,12 +17,15 @@
package org.osgi.util.promise;
import static org.osgi.util.promise.PromiseImpl.requireNonNull;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.osgi.util.promise.PromiseImpl.Result;
+
/**
* Static helper methods for {@link Promise}s.
*
@@ -42,7 +45,7 @@ public class Promises {
* @return A new Promise that has been resolved with the specified value.
*/
public static <T> Promise<T> resolved(T value) {
- return new PromiseImpl<T>(value, null);
+ return new PromiseImpl<>(value, null);
}
/**
@@ -54,7 +57,7 @@ public class Promises {
* @return A new Promise that has been resolved with the specified failure.
*/
public static <T> Promise<T> failed(Throwable failure) {
- return new PromiseImpl<T>(null, requireNonNull(failure));
+ return new PromiseImpl<>(null, requireNonNull(failure));
}
/**
@@ -85,13 +88,14 @@ public class Promises {
*/
public static <T, S extends T> Promise<List<T>> all(Collection<Promise<S>> promises) {
if (promises.isEmpty()) {
- List<T> result = new ArrayList<T>();
+ List<T> result = new ArrayList<>();
return resolved(result);
}
/* make a copy and capture the ordering */
- List<Promise<? extends T>> list = new ArrayList<Promise<? extends T>>(promises);
- PromiseImpl<List<T>> chained = new PromiseImpl<List<T>>();
- All<T> all = new All<T>(chained, list);
+ List<Promise< ? extends T>> list = new ArrayList<Promise< ? extends T>>(
+ promises);
+ PromiseImpl<List<T>> chained = new PromiseImpl<>();
+ All<T> all = new All<>(chained, list);
for (Promise<? extends T> promise : list) {
promise.onResolve(all);
}
@@ -121,6 +125,7 @@ public class Promises {
* {@link FailedPromisesException} must contain all of the specified
* Promises which resolved with a failure.
*/
+ @SafeVarargs
public static <T> Promise<List<T>> all(Promise<? extends T>... promises) {
@SuppressWarnings("unchecked")
List<Promise<T>> list = Arrays.asList((Promise<T>[]) promises);
@@ -144,36 +149,30 @@ public class Promises {
this.promiseCount = new AtomicInteger(promises.size());
}
+ @Override
public void run() {
if (promiseCount.decrementAndGet() != 0) {
return;
}
- List<T> result = new ArrayList<T>(promises.size());
- List<Promise<?>> failed = new ArrayList<Promise<?>>(promises.size());
+ List<T> value = new ArrayList<>(promises.size());
+ List<Promise<?>> failed = new ArrayList<>(promises.size());
Throwable cause = null;
for (Promise<? extends T> promise : promises) {
- Throwable failure;
- T value;
- try {
- failure = promise.getFailure();
- value = (failure != null) ? null : promise.getValue();
- } catch (Throwable e) {
- chained.resolve(null, e);
- return;
- }
- if (failure != null) {
+ Result<T> result = Result.collect(promise);
+ if (result.fail != null) {
failed.add(promise);
if (cause == null) {
- cause = failure;
+ cause = result.fail;
}
} else {
- result.add(value);
+ value.add(result.value);
}
}
if (failed.isEmpty()) {
- chained.resolve(result, null);
+ chained.tryResolve(value, null);
} else {
- chained.resolve(null, new FailedPromisesException(failed, cause));
+ chained.tryResolve(null,
+ new FailedPromisesException(failed, cause));
}
}
}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java
index c29fc4fb6..cedefa1a5 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Success.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2015). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@ import org.osgi.annotation.versioning.ConsumerType;
* @author $Id$
*/
@ConsumerType
+@FunctionalInterface
public interface Success<T, R> {
/**
* Success callback for a Promise.
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java
new file mode 100644
index 000000000..09186f552
--- /dev/null
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/TimeoutException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.promise;
+
+/**
+ * Timeout exception for a Promise.
+ *
+ * @since 1.1
+ * @author $Id$
+ */
+public class TimeoutException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Create a new {@code TimeoutException}.
+ */
+ public TimeoutException() {
+ super();
+ }
+}
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java
index 5a3ec65d3..56e2d8e4b 100644
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/package-info.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2014, 2016). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,25 +15,23 @@
*/
/**
- * Promise Package Version 1.0.
- *
+ * Promise Package Version 1.1.
* <p>
* Bundles wishing to use this package must list the package in the
* Import-Package header of the bundle's manifest.
- *
* <p>
* Example import for consumers using the API in this package:
* <p>
- * {@code Import-Package: org.osgi.util.promise; version="[1.0,2.0)"}
+ * {@code Import-Package: org.osgi.util.promise; version="[1.1,2.0)"}
* <p>
* Example import for providers implementing the API in this package:
* <p>
- * {@code Import-Package: org.osgi.util.promise; version="[1.0,1.1)"}
+ * {@code Import-Package: org.osgi.util.promise; version="[1.1,1.2)"}
*
* @author $Id$
*/
-@Version("1.0")
+@Version("1.1")
package org.osgi.util.promise;
import org.osgi.annotation.versioning.Version;
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo
deleted file mode 100644
index 7c8de0324..000000000
--- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/packageinfo
+++ /dev/null
@@ -1 +0,0 @@
-version 1.0

Back to the top