Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java')
-rw-r--r--bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java474
1 files changed, 474 insertions, 0 deletions
diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java
new file mode 100644
index 000000000..09c0c038f
--- /dev/null
+++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java
@@ -0,0 +1,474 @@
+/*
+ * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.promise;
+
+import static org.osgi.util.promise.PromiseImpl.uncaughtException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.osgi.annotation.versioning.ConsumerType;
+import org.osgi.util.promise.PromiseImpl.Result;
+
+/**
+ * Promise factory to create Deferred and Promise objects.
+ * <p>
+ * Instances of this class can be used to create Deferred and Promise objects
+ * which use the executors used to construct this object for any callback or
+ * scheduled operation execution.
+ *
+ * @Immutable
+ * @author $Id$
+ * @since 1.1
+ */
+@ConsumerType
+public class PromiseFactory {
+ /**
+ * The default factory which uses the default callback executor and default
+ * scheduled executor.
+ */
+ final static PromiseFactory defaultFactory = new PromiseFactory(
+ null, null);
+
+ /**
+ * The executor to use for callbacks. If {@code null}, the default
+ * callback executor is used.
+ */
+ private final Executor callbackExecutor;
+ /**
+ * The executor to use for scheduled operations. If {@code null}, the
+ * default scheduled executor is used.
+ */
+ private final ScheduledExecutorService scheduledExecutor;
+
+
+ /**
+ * Create a new PromiseFactory with the specified callback executor.
+ * <p>
+ * The default scheduled executor will be used.
+ *
+ * @param callbackExecutor The executor to use for callbacks. {@code null}
+ * can be specified for the default callback executor.
+ */
+ public PromiseFactory(Executor callbackExecutor) {
+ this(callbackExecutor, null);
+ }
+
+ /**
+ * Create a new PromiseFactory with the specified callback executor and
+ * specified scheduled executor.
+ *
+ * @param callbackExecutor The executor to use for callbacks. {@code null}
+ * can be specified for the default callback executor.
+ * @param scheduledExecutor The scheduled executor for use for scheduled
+ * operations. {@code null} can be specified for the default
+ * scheduled executor.
+ */
+ public PromiseFactory(Executor callbackExecutor,
+ ScheduledExecutorService scheduledExecutor) {
+ this.callbackExecutor = callbackExecutor;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ /**
+ * Returns the executor to use for callbacks.
+ *
+ * @return The executor to use for callbacks. This will be the default
+ * callback executor if {@code null} was specified for the callback
+ * executor when this PromiseFactory was created.
+ */
+ public Executor executor() {
+ if (callbackExecutor == null) {
+ return DefaultExecutors.callbackExecutor();
+ }
+ return callbackExecutor;
+ }
+
+ /**
+ * Returns the scheduled executor to use for scheduled operations.
+ *
+ * @return The scheduled executor to use for scheduled operations. This will
+ * be the default scheduled executor if {@code null} was specified
+ * for the scheduled executor when this PromiseFactory was created.
+ */
+ public ScheduledExecutorService scheduledExecutor() {
+ if (scheduledExecutor == null) {
+ return DefaultExecutors.scheduledExecutor();
+ }
+ return scheduledExecutor;
+ }
+
+ /**
+ * Create a new Deferred with the callback executor and scheduled executor
+ * of this PromiseFactory object.
+ * <p>
+ * Use this method instead of {@link Deferred#Deferred()} to create a new
+ * {@link Deferred} whose associated Promise uses executors other than the
+ * default executors.
+ *
+ * @param <T> The value type associated with the returned Deferred.
+ * @return A new {@link Deferred} with the callback and scheduled executors
+ * of this PromiseFactory object
+ */
+ public <T> Deferred<T> deferred() {
+ return new Deferred<>(this);
+ }
+
+ /**
+ * Returns a new Promise that has been resolved with the specified value.
+ * <p>
+ * The returned Promise uses the callback executor and scheduled executor of
+ * this PromiseFactory object
+ * <p>
+ * Use this method instead of {@link Promises#resolved(Object)} to create a
+ * Promise which uses executors other than the default executors.
+ *
+ * @param <T> The value type associated with the returned Promise.
+ * @param value The value of the resolved Promise.
+ * @return A new Promise that has been resolved with the specified value.
+ */
+ public <T> Promise<T> resolved(T value) {
+ return new ResolvedPromiseImpl<>(value, this);
+ }
+
+ /**
+ * Returns a new Promise that has been resolved with the specified failure.
+ * <p>
+ * The returned Promise uses the callback executor and scheduled executor of
+ * this PromiseFactory object
+ * <p>
+ * Use this method instead of {@link Promises#failed(Throwable)} to create a
+ * Promise which uses executors other than the default executors.
+ *
+ * @param <T> The value type associated with the returned Promise.
+ * @param failure The failure of the resolved Promise. Must not be
+ * {@code null}.
+ * @return A new Promise that has been resolved with the specified failure.
+ */
+ public <T> Promise<T> failed(Throwable failure) {
+ return new FailedPromiseImpl<>(failure, this);
+ }
+
+ /**
+ * Returns a new Promise that will hold the result of the specified task.
+ * <p>
+ * The returned Promise uses the callback executor and scheduled executor of
+ * this PromiseFactory object
+ * <p>
+ * The specified task will be executed on the {@link #executor() callback
+ * executor}.
+ *
+ * @param <T> The value type associated with the returned Promise.
+ * @param task The task whose result will be available from the returned
+ * Promise.
+ * @return A new Promise that will hold the result of the specified task.
+ */
+ public <T> Promise<T> submit(Callable< ? extends T> task) {
+ DeferredPromiseImpl<T> promise = new DeferredPromiseImpl<>(this);
+ Runnable submit = promise.new Submit(task);
+ try {
+ executor().execute(submit);
+ } catch (Exception t) {
+ promise.tryResolve(null, t);
+ }
+ return promise;
+ }
+
+ /**
+ * Returns a new Promise that is a latch on the resolution of the specified
+ * Promises.
+ * <p>
+ * The returned Promise uses the callback executor and scheduled executor of
+ * this PromiseFactory object
+ * <p>
+ * The returned Promise acts as a gate and must be resolved after all of the
+ * specified Promises are resolved.
+ *
+ * @param <T> The value type of the List value associated with the returned
+ * Promise.
+ * @param <S> A subtype of the value type of the List value associated with
+ * the returned Promise.
+ * @param promises The Promises which must be resolved before the returned
+ * Promise must be resolved. Must not be {@code null} and all of
+ * the elements in the collection must not be {@code null}.
+ * @return A Promise that must be successfully resolved with a List of the
+ * values in the order of the specified Promises if all the
+ * specified Promises are successfully resolved. The List in the
+ * returned Promise is the property of the caller and is modifiable.
+ * The returned Promise must be resolved with a failure of
+ * {@link FailedPromisesException} if any of the specified Promises
+ * are resolved with a failure. The failure
+ * {@link FailedPromisesException} must contain all of the specified
+ * Promises which resolved with a failure.
+ */
+ public <T, S extends T> Promise<List<T>> all(
+ Collection<Promise<S>> promises) {
+ if (promises.isEmpty()) {
+ List<T> result = new ArrayList<>();
+ return resolved(result);
+ }
+
+ DeferredPromiseImpl<List<T>> promise = new DeferredPromiseImpl<>(this);
+ /* make a copy and capture the ordering */
+ List<Promise<S>> list = new ArrayList<>(promises);
+ All<T,S> all = new All<>(promise, list);
+ for (Promise<S> p : list) {
+ p.onResolve(all);
+ }
+ return promise;
+ }
+
+ /**
+ * A callback used to resolve the specified Promise when the specified list
+ * of Promises are resolved for the {@link PromiseFactory#all(Collection)}
+ * method.
+ *
+ * @ThreadSafe
+ */
+ private static final class All<T, S extends T> implements Runnable {
+ private final DeferredPromiseImpl<List<T>> promise;
+ private final List<Promise<S>> promises;
+ private final AtomicInteger promiseCount;
+
+ All(DeferredPromiseImpl<List<T>> promise,
+ List<Promise<S>> promises) {
+ this.promise = promise;
+ this.promises = promises;
+ this.promiseCount = new AtomicInteger(promises.size());
+ }
+
+ @Override
+ public void run() {
+ if (promiseCount.decrementAndGet() != 0) {
+ return;
+ }
+ List<T> value = new ArrayList<>(promises.size());
+ List<Promise< ? >> failed = new ArrayList<>(promises.size());
+ Throwable cause = null;
+ for (Promise<S> p : promises) {
+ Result<S> result = PromiseImpl.collect(p);
+ if (result.fail != null) {
+ failed.add(p);
+ if (cause == null) {
+ cause = result.fail;
+ }
+ } else {
+ value.add(result.value);
+ }
+ }
+ if (failed.isEmpty()) {
+ promise.tryResolve(value, null);
+ } else {
+ promise.tryResolve(null,
+ new FailedPromisesException(failed, cause));
+ }
+ }
+ }
+
+ /**
+ * Returns an Executor implementation that executes tasks immediately on the
+ * thread calling the {@code Executor.execute} method.
+ *
+ * @return An Executor implementation that executes tasks immediately on the
+ * thread calling the {@code Executor.execute} method.
+ */
+ public static Executor inlineExecutor() {
+ return new InlineExecutor();
+ }
+
+ /**
+ * An Executor implementation which executes the task immediately on the
+ * thread calling the {@code Executor.execute} method.
+ *
+ * @Immutable
+ */
+ private static final class InlineExecutor implements Executor {
+ InlineExecutor() {}
+
+ @Override
+ public void execute(Runnable callback) {
+ callback.run();
+ }
+ }
+
+ /**
+ * Default executors for Promises.
+ *
+ * @Immutable
+ */
+ private static final class DefaultExecutors
+ implements ThreadFactory, RejectedExecutionHandler, Runnable {
+ private static final DefaultExecutors callbacks;
+ private static final ScheduledExecutor scheduledExecutor;
+ private static final ThreadPoolExecutor callbackExecutor;
+ static {
+ callbacks = new DefaultExecutors();
+ scheduledExecutor = new ScheduledExecutor(2, callbacks);
+ callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ callbacks, callbacks);
+ }
+
+ static Executor callbackExecutor() {
+ return callbackExecutor;
+ }
+
+ static ScheduledExecutorService scheduledExecutor() {
+ return scheduledExecutor;
+ }
+
+ private final AtomicBoolean shutdownHookInstalled;
+ private final ThreadFactory delegateThreadFactory;
+
+ private DefaultExecutors() {
+ shutdownHookInstalled = new AtomicBoolean();
+ delegateThreadFactory = Executors.defaultThreadFactory();
+ }
+
+ /**
+ * Executor threads should not prevent VM from exiting
+ */
+ @Override
+ public Thread newThread(Runnable r) {
+ if (shutdownHookInstalled.compareAndSet(false, true)) {
+ Thread shutdownThread = delegateThreadFactory.newThread(this);
+ shutdownThread.setName(
+ "ExecutorShutdownHook," + shutdownThread.getName());
+ try {
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+ } catch (IllegalStateException e) {
+ // VM is already shutting down...
+ callbackExecutor.shutdown();
+ scheduledExecutor.shutdown();
+ }
+ }
+ Thread t = delegateThreadFactory.newThread(r);
+ t.setName("PromiseFactory," + t.getName());
+ t.setDaemon(true);
+ return t;
+ }
+
+ /**
+ * Call the callback using the caller's thread because the thread pool
+ * rejected the execution.
+ */
+ @Override
+ public void rejectedExecution(Runnable callback,
+ ThreadPoolExecutor executor) {
+ try {
+ callback.run();
+ } catch (Throwable t) {
+ uncaughtException(t);
+ }
+ }
+
+ /**
+ * Shutdown hook
+ */
+ @Override
+ public void run() {
+ // limit new thread creation
+ callbackExecutor.setMaximumPoolSize(
+ Math.max(1, callbackExecutor.getPoolSize()));
+ // Run all delayed callbacks now
+ scheduledExecutor.shutdown();
+ BlockingQueue<Runnable> queue = scheduledExecutor.getQueue();
+ if (!queue.isEmpty()) {
+ for (Object r : queue.toArray()) {
+ if (r instanceof RunnableScheduledFuture< ? >) {
+ RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r;
+ if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L)
+ && queue.remove(future)) {
+ future.run();
+ scheduledExecutor.afterExecute(future, null);
+ }
+ }
+ }
+ scheduledExecutor.shutdown();
+ }
+ try {
+ scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // Shutdown callback executor
+ callbackExecutor.shutdown();
+ try {
+ callbackExecutor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * ScheduledThreadPoolExecutor for scheduled execution.
+ *
+ * @ThreadSafe
+ */
+ private static final class ScheduledExecutor
+ extends ScheduledThreadPoolExecutor {
+ ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
+ super(corePoolSize, threadFactory);
+ }
+
+ /**
+ * Handle uncaught exceptions
+ */
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if ((t == null) && (r instanceof Future< ? >)) {
+ boolean interrupted = Thread.interrupted();
+ try {
+ ((Future< ? >) r).get();
+ } catch (CancellationException e) {
+ // ignore
+ } catch (InterruptedException e) {
+ interrupted = true;
+ } catch (ExecutionException e) {
+ t = e.getCause();
+ } finally {
+ if (interrupted) { // restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (t != null) {
+ uncaughtException(t);
+ }
+ }
+ }
+ }
+}

Back to the top