diff options
12 files changed, 1076 insertions, 786 deletions
diff --git a/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/Logger.java b/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/Logger.java index a9cc36a67..3424a62fd 100644 --- a/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/Logger.java +++ b/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/Logger.java @@ -106,6 +106,15 @@ public interface Logger { void trace(String format, Object... arguments); /** + * Perform the specified operation if logging enabled for the + * {@link LogLevel#TRACE} level. + * + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. + */ + <E extends Exception> void trace(LoggerConsumer<E> consumer) throws E; + + /** * Is logging enabled for the {@link LogLevel#DEBUG} level? * * @return {@code true} if logging is enabled for the {@link LogLevel#DEBUG @@ -146,6 +155,15 @@ public interface Logger { void debug(String format, Object... arguments); /** + * Perform the specified operation if logging enabled for the + * {@link LogLevel#DEBUG} level. + * + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. + */ + <E extends Exception> void debug(LoggerConsumer<E> consumer) throws E; + + /** * Is logging enabled for the {@link LogLevel#INFO} level? * * @return {@code true} if logging is enabled for the {@link LogLevel#INFO @@ -186,6 +204,15 @@ public interface Logger { void info(String format, Object... arguments); /** + * Perform the specified operation if logging enabled for the + * {@link LogLevel#INFO} level. + * + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. + */ + <E extends Exception> void info(LoggerConsumer<E> consumer) throws E; + + /** * Is logging enabled for the {@link LogLevel#WARN} level? * * @return {@code true} if logging is enabled for the {@link LogLevel#WARN @@ -226,6 +253,15 @@ public interface Logger { void warn(String format, Object... arguments); /** + * Perform the specified operation if logging enabled for the + * {@link LogLevel#WARN} level. + * + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. + */ + <E extends Exception> void warn(LoggerConsumer<E> consumer) throws E; + + /** * Is logging enabled for the {@link LogLevel#ERROR} level? * * @return {@code true} if logging is enabled for the {@link LogLevel#ERROR @@ -266,6 +302,15 @@ public interface Logger { void error(String format, Object... arguments); /** + * Perform the specified operation if logging enabled for the + * {@link LogLevel#ERROR} level. + * + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. + */ + <E extends Exception> void error(LoggerConsumer<E> consumer) throws E; + + /** * Log a message at the {@link LogLevel#AUDIT} level. * * @param message The message to log. diff --git a/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/LoggerConsumer.java b/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/LoggerConsumer.java new file mode 100644 index 000000000..332fa18a8 --- /dev/null +++ b/bundles/org.eclipse.osgi.services/src/org/osgi/service/log/LoggerConsumer.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.service.log; + +import org.osgi.annotation.versioning.ConsumerType; + +/** + * An operation that accepts a {@link Logger} argument and produces no result. + * <p> + * This is a functional interface and can be used as the assignment target for a + * lambda expression or method reference. + * + * @param <E> The type of the exception that may be thrown. + * @ThreadSafe + * @since 1.4 + * @author $Id$ + */ +@ConsumerType +@FunctionalInterface +public interface LoggerConsumer<E extends Exception> { + /** + * Perform this operation on the specified {@link Logger}. + * + * @param l The {@link Logger} input to this operation. + * @throws E An exception thrown by the operation. + */ + void accept(Logger l) throws E; +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java index e2c26cb80..6ab8f936f 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Deferred.java @@ -45,7 +45,7 @@ public class Deferred<T> { /** * The Promise associated with this Deferred. */ - private final PromiseImpl<T> promise; + private final DeferredPromiseImpl<T> promise; /** * Create a new Deferred. @@ -53,22 +53,21 @@ public class Deferred<T> { * The {@link #getPromise() associated promise} will use the default * callback executor and default scheduled executor. * - * @see PromiseExecutors#deferred() + * @see PromiseFactory#deferred() */ public Deferred() { - this(PromiseExecutors.defaultExecutors); + this(PromiseFactory.defaultFactory); } /** * Create a new Deferred with the specified callback and scheduled * executors. * - * @param executors The executors to use for callbacks and scheduled - * operations. + * @param factory The factory to use for callbacks and scheduled operations. * @since 1.1 */ - Deferred(PromiseExecutors executors) { - promise = new PromiseImpl<>(executors); + Deferred(PromiseFactory factory) { + promise = new DeferredPromiseImpl<>(factory); } /** diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/DeferredPromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/DeferredPromiseImpl.java new file mode 100644 index 000000000..5f103429f --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/DeferredPromiseImpl.java @@ -0,0 +1,589 @@ +/* + * Copyright (c) OSGi Alliance (2014, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +import static java.util.Objects.requireNonNull; + +import java.lang.reflect.InvocationTargetException; +import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import org.osgi.util.function.Consumer; +import org.osgi.util.function.Function; +import org.osgi.util.function.Predicate; + +/** + * Deferred Promise implementation. + * <p> + * This class is not used directly by clients. Clients should use + * {@link PromiseFactory#deferred()} to create a {@link Deferred} which can be + * used to obtain a Promise whose resolution can be deferred. + * + * @param <T> The result type associated with the Promise. + * @since 1.1 + * @ThreadSafe + * @author $Id$ + */ +final class DeferredPromiseImpl<T> extends PromiseImpl<T> { + /** + * A CountDownLatch to manage the resolved state of this Promise. + * <p> + * This object is used as the synchronizing object to provide a critical + * section in {@link #tryResolve(Object, Throwable)} so that only a single + * thread can write the resolved state variables and open the latch. + * <p> + * The resolved state variables, {@link #value} and {@link #fail}, must only + * be written when the latch is closed (getCount() != 0) and must only be + * read when the latch is open (getCount() == 0). The latch state must + * always be checked before writing or reading since the resolved state + * variables' memory consistency is guarded by the latch. + */ + private final CountDownLatch resolved; + + /** + * The value of this Promise if successfully resolved. + * + * @see resolved + */ + // @GuardedBy("resolved") + private T value; + + /** + * The failure of this Promise if resolved with a failure or {@code null} if + * successfully resolved. + * + * @see resolved + */ + // @GuardedBy("resolved") + private Throwable fail; + + /** + * Initialize this Promise. + * + * @param factory The factory to use for callbacks and scheduled operations. + */ + DeferredPromiseImpl(PromiseFactory factory) { + super(factory); + resolved = new CountDownLatch(1); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isDone() { + return resolved.getCount() == 0; + } + + /** + * {@inheritDoc} + */ + @Override + public T getValue() throws InvocationTargetException, InterruptedException { + resolved.await(); + if (fail == null) { + return value; + } + throw new InvocationTargetException(fail); + } + + /** + * {@inheritDoc} + */ + @Override + public Throwable getFailure() throws InterruptedException { + resolved.await(); + return fail; + } + + /** + * Return a holder of the result of this PromiseImpl. + */ + @Override + Result<T> collect() { + // ensure latch open before reading state + if (!isDone()) { + return new Result<T>(new AssertionError("promise not resolved")); + } + if (fail == null) { + return new Result<T>(value); + } + return new Result<T>(fail); + } + + @Override + public String toString() { + // ensure latch open before reading state + if (!isDone()) { + return super.toString() + "[unresolved]"; + } + if (fail == null) { + return super.toString() + "[resolved: " + value + "]"; + } + return super.toString() + "[failed: " + fail + "]"; + } + + /** + * Try to resolve this Promise. + * <p> + * If this Promise was already resolved, return false. Otherwise, resolve + * this Promise and return true. + * + * @param v The value of this Promise. + * @param f The failure of this Promise. + * @return false if this Promise was already resolved; true if this method + * resolved this Promise. + */ + boolean tryResolve(T v, Throwable f) { + // critical section: only one resolver at a time + synchronized (resolved) { + if (isDone()) { + return false; + } + /* + * The resolved state variables must be set before opening the + * latch. This safely publishes them to be read by other threads + * that must verify the latch is open before reading. + */ + if (f == null) { + value = v; + } else { + fail = f; + } + resolved.countDown(); + } + notifyCallbacks(); // call any registered callbacks + return true; + } + + /** + * Resolve this Promise. + * <p> + * If this Promise was already resolved, throw IllegalStateException. + * Otherwise, resolve this Promise. + * + * @param v The value of this Promise. + * @param f The failure of this Promise. + * @throws IllegalStateException If this Promise was already resolved. + */ + void resolve(T v, Throwable f) { + if (!tryResolve(v, f)) { + throw new IllegalStateException("Already resolved"); + } + } + + /** + * Resolve this Promise with the specified Promise. + * <p> + * If the specified Promise is successfully resolved, this Promise is + * resolved with the value of the specified Promise. If the specified + * Promise is resolved with a failure, this Promise is resolved with the + * failure of the specified Promise. + * + * @param with A Promise whose value or failure must be used to resolve this + * Promise. Must not be {@code null}. + * @return A Promise that is resolved only when this Promise is resolved by + * the specified Promise. The returned Promise must be successfully + * resolved with the value {@code null}, if this Promise was + * resolved by the specified Promise. The returned Promise must be + * resolved with a failure of {@link IllegalStateException}, if this + * Promise was already resolved when the specified Promise was + * resolved. + */ + Promise<Void> resolveWith(Promise< ? extends T> with) { + DeferredPromiseImpl<Void> chained = deferred(); + with.onResolve(chained.new ResolveWith<>(this, with)); + return chained; + } + + /** + * A callback used to resolve a Promise with another Promise for the + * {@link #resolveWith(Promise)} method. + * + * @Immutable + */ + private final class ResolveWith<P> implements Runnable { + private final DeferredPromiseImpl<P> promise; + private final Promise< ? extends P> with; + + ResolveWith(DeferredPromiseImpl<P> promise, + Promise< ? extends P> with) { + this.promise = promise; + this.with = requireNonNull(with); + } + + @Override + public void run() { + Throwable f = null; + Result<P> result = collect(with); + try { + promise.resolve(result.value, result.fail); + } catch (Throwable e) { + f = e; // propagate new exception + } + tryResolve(null, f); + } + } + + /** + * A callback used to chain promises for the + * {@link PromiseImpl#then(Success, Failure)} method. + * + * @Immutable + */ + final class Then<P> implements Runnable { + private final PromiseImpl<P> promise; + private final Success<P, ? extends T> success; + private final Failure failure; + + @SuppressWarnings("unchecked") + Then(PromiseImpl<P> promise, Success< ? super P, ? extends T> success, + Failure failure) { + this.promise = promise; + this.success = (Success<P, ? extends T>) success; + this.failure = failure; + } + + @Override + public void run() { + Result<P> result = promise.collect(); + if (result.fail != null) { + if (failure != null) { + try { + failure.fail(promise); + } catch (Throwable e) { + result.fail = e; // propagate new exception + } + } + } else if (success != null) { + Promise< ? extends T> returned = null; + try { + returned = success.call(promise); + } catch (Throwable e) { + result.fail = e; // propagate new exception + } + if (returned != null) { + // resolve chained when returned promise is resolved + returned.onResolve(new Chain(returned)); + return; + } + } + tryResolve(null, result.fail); + } + } + + /** + * A callback used to resolve the chained Promise when the Promise is + * resolved. + * + * @Immutable + */ + private final class Chain implements Runnable { + private final Promise< ? extends T> promise; + + Chain(Promise< ? extends T> promise) { + this.promise = promise; + } + + @Override + public void run() { + Result<T> result = collect(promise); + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used to resolve the chained Promise when the PromiseImpl is + * resolved. + * + * @Immutable + */ + final class ChainImpl implements Runnable { + private final PromiseImpl<T> promise; + + ChainImpl(PromiseImpl<T> promise) { + this.promise = promise; + } + + @Override + public void run() { + Result<T> result = promise.collect(); + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#thenAccept(Consumer)} method. + * + * @Immutable + */ + final class ThenAccept implements Runnable { + private final PromiseImpl<T> promise; + private final Consumer< ? super T> consumer; + + ThenAccept(PromiseImpl<T> promise, Consumer< ? super T> consumer) { + this.promise = promise; + this.consumer = requireNonNull(consumer); + } + + @Override + public void run() { + Result<T> result = promise.collect(); + if (result.fail == null) { + try { + consumer.accept(result.value); + } catch (Throwable e) { + result.fail = e; + } + } + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#filter(Predicate)} method. + * + * @Immutable + */ + final class Filter implements Runnable { + private final PromiseImpl<T> promise; + private final Predicate< ? super T> predicate; + + Filter(PromiseImpl<T> promise, Predicate< ? super T> predicate) { + this.promise = promise; + this.predicate = requireNonNull(predicate); + } + + @Override + public void run() { + Result<T> result = promise.collect(); + if (result.fail == null) { + try { + if (!predicate.test(result.value)) { + result.fail = new NoSuchElementException(); + } + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + } + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#map(Function)} method. + * + * @Immutable + */ + final class Map<P> implements Runnable { + private final PromiseImpl<P> promise; + private final Function< ? super P, ? extends T> mapper; + + Map(PromiseImpl<P> promise, Function< ? super P, ? extends T> mapper) { + this.promise = promise; + this.mapper = requireNonNull(mapper); + } + + @Override + public void run() { + Result<P> result = promise.collect(); + T v = null; + if (result.fail == null) { + try { + v = mapper.apply(result.value); + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + } + tryResolve(v, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#flatMap(Function)} method. + * + * @Immutable + */ + final class FlatMap<P> implements Runnable { + private final PromiseImpl<P> promise; + private final Function< ? super P,Promise< ? extends T>> mapper; + + FlatMap(PromiseImpl<P> promise, + Function< ? super P,Promise< ? extends T>> mapper) { + this.promise = promise; + this.mapper = requireNonNull(mapper); + } + + @Override + public void run() { + Result<P> result = promise.collect(); + if (result.fail == null) { + Promise< ? extends T> flatmap = null; + try { + flatmap = mapper.apply(result.value); + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + if (flatmap != null) { + flatmap.onResolve(new Chain(flatmap)); + return; + } + } + tryResolve(null, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#recover(Function)} method. + * + * @Immutable + */ + final class Recover implements Runnable { + private final PromiseImpl<T> promise; + private final Function<Promise< ? >, ? extends T> recovery; + + Recover(PromiseImpl<T> promise, + Function<Promise< ? >, ? extends T> recovery) { + this.promise = promise; + this.recovery = requireNonNull(recovery); + } + + @Override + public void run() { + Result<T> result = promise.collect(); + if (result.fail != null) { + try { + T v = recovery.apply(promise); + if (v != null) { + result.value = v; + result.fail = null; + } + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + } + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#recoverWith(Function)} method. + * + * @Immutable + */ + final class RecoverWith implements Runnable { + private final PromiseImpl<T> promise; + private final Function<Promise< ? >,Promise< ? extends T>> recovery; + + RecoverWith(PromiseImpl<T> promise, + Function<Promise< ? >,Promise< ? extends T>> recovery) { + this.promise = promise; + this.recovery = requireNonNull(recovery); + } + + @Override + public void run() { + Result<T> result = promise.collect(); + if (result.fail != null) { + Promise< ? extends T> recovered = null; + try { + recovered = recovery.apply(promise); + } catch (Throwable e) { // propagate new exception + result.fail = e; + } + if (recovered != null) { + recovered.onResolve(new Chain(recovered)); + return; + } + } + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method. + * + * @Immutable + */ + final class FallbackTo implements Runnable { + private final PromiseImpl<T> promise; + private final Promise< ? extends T> fallback; + + FallbackTo(PromiseImpl<T> promise, Promise< ? extends T> fallback) { + this.promise = promise; + this.fallback = requireNonNull(fallback); + } + + @Override + public void run() { + Result<T> result = promise.collect(); + if (result.fail != null) { + fallback.onResolve(new FallbackChain(fallback, result.fail)); + return; + } + tryResolve(result.value, null); + } + } + + /** + * A callback used to resolve the chained Promise when the fallback Promise + * is resolved. + * + * @Immutable + */ + private final class FallbackChain implements Runnable { + private final Promise< ? extends T> fallback; + private final Throwable failure; + + FallbackChain(Promise< ? extends T> fallback, Throwable failure) { + this.fallback = fallback; + this.failure = failure; + } + + @Override + public void run() { + Result<T> result = collect(fallback); + if (result.fail != null) { + result.fail = failure; + } + tryResolve(result.value, result.fail); + } + } + + /** + * A callback used by the {@link PromiseFactory#submit(Callable)} method. + * + * @Immutable + */ + final class Submit implements Runnable { + private final Callable< ? extends T> task; + + Submit(Callable< ? extends T> task) { + this.task = requireNonNull(task); + } + + @Override + public void run() { + try { + tryResolve(task.call(), null); + } catch (Throwable t) { + tryResolve(null, t); + } + } + } +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/FailedPromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/FailedPromiseImpl.java new file mode 100644 index 000000000..b2e757f45 --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/FailedPromiseImpl.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +import static java.util.Objects.requireNonNull; + +import java.lang.reflect.InvocationTargetException; + +/** + * Failed Promise implementation. + * <p> + * This class is not used directly by clients. Clients should use + * {@link PromiseFactory#failed(Throwable)} to create a failed {@link Promise}. + * + * @param <T> The result type associated with the Promise. + * @since 1.1 + * @ThreadSafe + * @author $Id$ + */ +final class FailedPromiseImpl<T> extends PromiseImpl<T> { + /** + * The failure of this failed Promise. + */ + private final Throwable fail; + + /** + * Initialize this failed Promise. + * + * @param fail The failure of this failed Promise. Must not be {@code null}. + * @param factory The factory to use for callbacks and scheduled operations. + */ + FailedPromiseImpl(Throwable fail, PromiseFactory factory) { + super(factory); + this.fail = requireNonNull(fail); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isDone() { + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public T getValue() throws InvocationTargetException { + throw new InvocationTargetException(fail); + } + + /** + * {@inheritDoc} + */ + @Override + public Throwable getFailure() { + return fail; + } + + /** + * Return a holder of the result of this PromiseImpl. + */ + @Override + Result<T> collect() { + return new Result<T>(fail); + } + + @Override + public String toString() { + return super.toString() + "[failed: " + fail + "]"; + } +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java index 488ca7bac..09c0c038f 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseExecutors.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseFactory.java @@ -16,9 +16,11 @@ package org.osgi.util.promise; -import static java.util.Objects.requireNonNull; import static org.osgi.util.promise.PromiseImpl.uncaughtException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -35,27 +37,29 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.osgi.annotation.versioning.ConsumerType; +import org.osgi.util.promise.PromiseImpl.Result; /** - * The executors for Promise callbacks and scheduled operations. + * Promise factory to create Deferred and Promise objects. * <p> - * Instances of this class can be used to create a Deferred that can be resolved - * in the future as well as resolved Promises. The returned Deferred and Promise - * objects all use the executors used to construct this object for any callback - * or scheduled operation execution. + * Instances of this class can be used to create Deferred and Promise objects + * which use the executors used to construct this object for any callback or + * scheduled operation execution. * * @Immutable * @author $Id$ * @since 1.1 */ @ConsumerType -public class PromiseExecutors { +public class PromiseFactory { /** - * The default executors. + * The default factory which uses the default callback executor and default + * scheduled executor. */ - final static PromiseExecutors defaultExecutors = new PromiseExecutors( + final static PromiseFactory defaultFactory = new PromiseFactory( null, null); /** @@ -71,19 +75,19 @@ public class PromiseExecutors { /** - * Create a new PromiseExecutors with the specified callback executor. + * Create a new PromiseFactory with the specified callback executor. * <p> * The default scheduled executor will be used. * * @param callbackExecutor The executor to use for callbacks. {@code null} * can be specified for the default callback executor. */ - public PromiseExecutors(Executor callbackExecutor) { + public PromiseFactory(Executor callbackExecutor) { this(callbackExecutor, null); } /** - * Create a new PromiseExecutors with the specified callback executor and + * Create a new PromiseFactory with the specified callback executor and * specified scheduled executor. * * @param callbackExecutor The executor to use for callbacks. {@code null} @@ -92,7 +96,7 @@ public class PromiseExecutors { * operations. {@code null} can be specified for the default * scheduled executor. */ - public PromiseExecutors(Executor callbackExecutor, + public PromiseFactory(Executor callbackExecutor, ScheduledExecutorService scheduledExecutor) { this.callbackExecutor = callbackExecutor; this.scheduledExecutor = scheduledExecutor; @@ -103,9 +107,9 @@ public class PromiseExecutors { * * @return The executor to use for callbacks. This will be the default * callback executor if {@code null} was specified for the callback - * executor when this PromiseExecutors was created. + * executor when this PromiseFactory was created. */ - protected Executor executor() { + public Executor executor() { if (callbackExecutor == null) { return DefaultExecutors.callbackExecutor(); } @@ -113,13 +117,13 @@ public class PromiseExecutors { } /** - * Returns the executor to use for scheduled operations. + * Returns the scheduled executor to use for scheduled operations. * - * @return The executor to use for scheduled operations. This will be the - * default scheduled executor if {@code null} was specified for the - * scheduled executor when this PromiseExecutors was created. + * @return The scheduled executor to use for scheduled operations. This will + * be the default scheduled executor if {@code null} was specified + * for the scheduled executor when this PromiseFactory was created. */ - protected ScheduledExecutorService scheduledExecutor() { + public ScheduledExecutorService scheduledExecutor() { if (scheduledExecutor == null) { return DefaultExecutors.scheduledExecutor(); } @@ -128,14 +132,15 @@ public class PromiseExecutors { /** * Create a new Deferred with the callback executor and scheduled executor - * of this PromiseExecutors object. + * of this PromiseFactory object. * <p> * Use this method instead of {@link Deferred#Deferred()} to create a new * {@link Deferred} whose associated Promise uses executors other than the * default executors. * + * @param <T> The value type associated with the returned Deferred. * @return A new {@link Deferred} with the callback and scheduled executors - * of this PromiseExecutors object + * of this PromiseFactory object */ public <T> Deferred<T> deferred() { return new Deferred<>(this); @@ -145,7 +150,7 @@ public class PromiseExecutors { * Returns a new Promise that has been resolved with the specified value. * <p> * The returned Promise uses the callback executor and scheduled executor of - * this PromiseExecutors object + * this PromiseFactory object * <p> * Use this method instead of {@link Promises#resolved(Object)} to create a * Promise which uses executors other than the default executors. @@ -155,14 +160,14 @@ public class PromiseExecutors { * @return A new Promise that has been resolved with the specified value. */ public <T> Promise<T> resolved(T value) { - return new PromiseImpl<>(value, null, this); + return new ResolvedPromiseImpl<>(value, this); } /** * Returns a new Promise that has been resolved with the specified failure. * <p> * The returned Promise uses the callback executor and scheduled executor of - * this PromiseExecutors object + * this PromiseFactory object * <p> * Use this method instead of {@link Promises#failed(Throwable)} to create a * Promise which uses executors other than the default executors. @@ -173,37 +178,123 @@ public class PromiseExecutors { * @return A new Promise that has been resolved with the specified failure. */ public <T> Promise<T> failed(Throwable failure) { - return new PromiseImpl<>(null, requireNonNull(failure), this); + return new FailedPromiseImpl<>(failure, this); } /** * Returns a new Promise that will hold the result of the specified task. * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseFactory object + * <p> * The specified task will be executed on the {@link #executor() callback * executor}. * + * @param <T> The value type associated with the returned Promise. * @param task The task whose result will be available from the returned * Promise. * @return A new Promise that will hold the result of the specified task. */ - public <V> Promise<V> submit(final Callable< ? extends V> task) { - requireNonNull(task); - final Deferred<V> deferred = deferred(); + public <T> Promise<T> submit(Callable< ? extends T> task) { + DeferredPromiseImpl<T> promise = new DeferredPromiseImpl<>(this); + Runnable submit = promise.new Submit(task); try { - executor().execute(new Runnable() { - @Override - public void run() { - try { - deferred.resolve(task.call()); - } catch (Throwable t) { - deferred.fail(t); + executor().execute(submit); + } catch (Exception t) { + promise.tryResolve(null, t); + } + return promise; + } + + /** + * Returns a new Promise that is a latch on the resolution of the specified + * Promises. + * <p> + * The returned Promise uses the callback executor and scheduled executor of + * this PromiseFactory object + * <p> + * The returned Promise acts as a gate and must be resolved after all of the + * specified Promises are resolved. + * + * @param <T> The value type of the List value associated with the returned + * Promise. + * @param <S> A subtype of the value type of the List value associated with + * the returned Promise. + * @param promises The Promises which must be resolved before the returned + * Promise must be resolved. Must not be {@code null} and all of + * the elements in the collection must not be {@code null}. + * @return A Promise that must be successfully resolved with a List of the + * values in the order of the specified Promises if all the + * specified Promises are successfully resolved. The List in the + * returned Promise is the property of the caller and is modifiable. + * The returned Promise must be resolved with a failure of + * {@link FailedPromisesException} if any of the specified Promises + * are resolved with a failure. The failure + * {@link FailedPromisesException} must contain all of the specified + * Promises which resolved with a failure. + */ + public <T, S extends T> Promise<List<T>> all( + Collection<Promise<S>> promises) { + if (promises.isEmpty()) { + List<T> result = new ArrayList<>(); + return resolved(result); + } + + DeferredPromiseImpl<List<T>> promise = new DeferredPromiseImpl<>(this); + /* make a copy and capture the ordering */ + List<Promise<S>> list = new ArrayList<>(promises); + All<T,S> all = new All<>(promise, list); + for (Promise<S> p : list) { + p.onResolve(all); + } + return promise; + } + + /** + * A callback used to resolve the specified Promise when the specified list + * of Promises are resolved for the {@link PromiseFactory#all(Collection)} + * method. + * + * @ThreadSafe + */ + private static final class All<T, S extends T> implements Runnable { + private final DeferredPromiseImpl<List<T>> promise; + private final List<Promise<S>> promises; + private final AtomicInteger promiseCount; + + All(DeferredPromiseImpl<List<T>> promise, + List<Promise<S>> promises) { + this.promise = promise; + this.promises = promises; + this.promiseCount = new AtomicInteger(promises.size()); + } + + @Override + public void run() { + if (promiseCount.decrementAndGet() != 0) { + return; + } + List<T> value = new ArrayList<>(promises.size()); + List<Promise< ? >> failed = new ArrayList<>(promises.size()); + Throwable cause = null; + for (Promise<S> p : promises) { + Result<S> result = PromiseImpl.collect(p); + if (result.fail != null) { + failed.add(p); + if (cause == null) { + cause = result.fail; } + } else { + value.add(result.value); } - }); - } catch (Exception t) { - deferred.fail(t); + } + if (failed.isEmpty()) { + promise.tryResolve(value, null); + } else { + promise.tryResolve(null, + new FailedPromisesException(failed, cause)); + } } - return deferred.getPromise(); } /** @@ -284,7 +375,7 @@ public class PromiseExecutors { } } Thread t = delegateThreadFactory.newThread(r); - t.setName("PromiseImpl," + t.getName()); + t.setName("PromiseFactory," + t.getName()); t.setDaemon(true); return t; } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java index 5d02ff653..71ecd591b 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/PromiseImpl.java @@ -18,10 +18,7 @@ package org.osgi.util.promise; import static java.util.Objects.requireNonNull; -import java.lang.reflect.InvocationTargetException; -import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -31,178 +28,47 @@ import org.osgi.util.function.Function; import org.osgi.util.function.Predicate; /** - * Promise implementation. + * Abstract Promise implementation. * <p> * This class is not used directly by clients. Clients should use - * {@link Deferred} to create a resolvable {@link Promise}. + * {@link PromiseFactory} to create a {@link Promise}. * * @param <T> The result type associated with the Promise. * @ThreadSafe * @author $Id$ */ -final class PromiseImpl<T> implements Promise<T> { +abstract class PromiseImpl<T> implements Promise<T> { /** - * The executors to use for callbacks and scheduled operations. + * The factory to use for callbacks and scheduled operations. */ - private final PromiseExecutors executors; + private final PromiseFactory factory; /** * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no * additional synchronization is required to write to or read from the * queue. */ private final ConcurrentLinkedQueue<Runnable> callbacks; - /** - * A CountDownLatch to manage the resolved state of this Promise. - * <p> - * This object is used as the synchronizing object to provide a critical - * section in {@link #tryResolve(Object, Throwable)} so that only a single - * thread can write the resolved state variables and open the latch. - * <p> - * The resolved state variables, {@link #value} and {@link #fail}, must only - * be written when the latch is closed (getCount() != 0) and must only be - * read when the latch is open (getCount() == 0). The latch state must - * always be checked before writing or reading since the resolved state - * variables' memory consistency is guarded by the latch. - */ - private final CountDownLatch resolved; - /** - * The value of this Promise if successfully resolved. - * - * @GuardedBy("resolved") - * @see #resolved - */ - private T value; - /** - * The failure of this Promise if resolved with a failure or {@code null} if - * successfully resolved. - * - * @GuardedBy("resolved") - * @see #resolved - */ - private Throwable fail; /** * Initialize this Promise. * - * @param executors The executors to use for callbacks and scheduled - * operations. - */ - PromiseImpl(PromiseExecutors executors) { - this.executors = executors; - callbacks = new ConcurrentLinkedQueue<>(); - resolved = new CountDownLatch(1); - } - - /** - * Initialize and resolve this Promise. - * - * @param v The value of this resolved Promise. - * @param f The failure of this resolved Promise. - * @param executors The executors to use for callbacks and scheduled + * @param factory The factory to use for callbacks and scheduled * operations. */ - PromiseImpl(T v, Throwable f, PromiseExecutors executors) { - if (f == null) { - value = v; - } else { - fail = f; - } - this.executors = executors; + PromiseImpl(PromiseFactory factory) { + this.factory = factory; callbacks = new ConcurrentLinkedQueue<>(); - resolved = new CountDownLatch(0); - } - - /** - * Try to resolve this Promise. - * <p> - * If this Promise was already resolved, return false. Otherwise, resolve - * this Promise and return true. - * - * @param v The value of this Promise. - * @param f The failure of this Promise. - * @return false if this Promise was already resolved; true if this method - * resolved this Promise. - * @since 1.1 - */ - boolean tryResolve(T v, Throwable f) { - // critical section: only one resolver at a time - synchronized (resolved) { - if (resolved.getCount() == 0) { - return false; - } - /* - * The resolved state variables must be set before opening the - * latch. This safely publishes them to be read by other threads - * that must verify the latch is open before reading. - */ - if (f == null) { - value = v; - } else { - fail = f; - } - resolved.countDown(); - } - notifyCallbacks(); // call any registered callbacks - return true; } /** - * Resolve this Promise. - * <p> - * If this Promise was already resolved, throw IllegalStateException. - * Otherwise, resolve this Promise. + * Return a new {@link DeferredPromiseImpl} using the {@link PromiseFactory} + * of this PromiseImpl. * - * @param v The value of this Promise. - * @param f The failure of this Promise. - * @throws IllegalStateException If this Promise was already resolved. - */ - void resolve(T v, Throwable f) { - if (!tryResolve(v, f)) { - throw new IllegalStateException("Already resolved"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isDone() { - return resolved.getCount() == 0; - } - - /** - * {@inheritDoc} - */ - @Override - public T getValue() throws InvocationTargetException, InterruptedException { - resolved.await(); - if (fail == null) { - return value; - } - throw new InvocationTargetException(fail); - } - - /** - * {@inheritDoc} - */ - @Override - public Throwable getFailure() throws InterruptedException { - resolved.await(); - return fail; - } - - /** + * @return A new DeferredPromiseImpl. * @since 1.1 */ - @Override - public String toString() { - if (!isDone()) { // ensure latch open before reading state - return super.toString() + "[unresolved]"; - } - if (fail == null) { - return super.toString() + "[resolved: " + value + "]"; - } - return super.toString() + "[failed: " + fail + "]"; + <V> DeferredPromiseImpl<V> deferred() { + return new DeferredPromiseImpl<>(factory); } /** @@ -218,8 +84,8 @@ final class PromiseImpl<T> implements Promise<T> { /** * Call any registered callbacks if this Promise is resolved. */ - private void notifyCallbacks() { - if (resolved.getCount() != 0) { + void notifyCallbacks() { + if (!isDone()) { return; // return if not resolved } /* @@ -230,7 +96,7 @@ final class PromiseImpl<T> implements Promise<T> { for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { try { try { - executors.executor().execute(callback); + factory.executor().execute(callback); } catch (RejectedExecutionException e) { callback.run(); } @@ -241,6 +107,26 @@ final class PromiseImpl<T> implements Promise<T> { } /** + * Schedule a operation on the scheduled executor. + * + * @since 1.1 + */ + ScheduledFuture< ? > schedule(Runnable operation, long delay, + TimeUnit unit) { + try { + try { + return factory.scheduledExecutor().schedule(operation, delay, + unit); + } catch (RejectedExecutionException e) { + operation.run(); + } + } catch (Throwable t) { + uncaughtException(t); + } + return null; + } + + /** * Handle an uncaught exception from a Runnable. * * @param t The uncaught exception. @@ -332,7 +218,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { - PromiseImpl<R> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<R> chained = deferred(); onResolve(chained.new Then<>(this, success, failure)); return chained; } @@ -346,494 +232,94 @@ final class PromiseImpl<T> implements Promise<T> { } /** - * A callback used to chain promises for the {@link #then(Success, Failure)} - * method. - * - * @Immutable - */ - private final class Then<P> implements Runnable { - private final PromiseImpl<P> promise; - private final Success<P, ? extends T> success; - private final Failure failure; - - @SuppressWarnings("unchecked") - Then(PromiseImpl<P> promise, Success< ? super P, ? extends T> success, - Failure failure) { - this.promise = promise; - this.success = (Success<P, ? extends T>) success; - this.failure = failure; - } - - @Override - public void run() { - Result<P> result = promise.collect(); - if (result.fail != null) { - if (failure != null) { - try { - failure.fail(promise); - } catch (Throwable e) { - result.fail = e; // propagate new exception - } - } - } else if (success != null) { - Promise< ? extends T> returned = null; - try { - returned = success.call(promise); - } catch (Throwable e) { - result.fail = e; // propagate new exception - } - if (returned != null) { - // resolve chained when returned promise is resolved - returned.onResolve(new Chain(returned)); - return; - } - } - tryResolve(null, result.fail); - } - } - - /** - * A callback used to resolve the chained Promise when the Promise is - * resolved. - * - * @Immutable - */ - private final class Chain implements Runnable { - private final Promise< ? extends T> promise; - - Chain(Promise< ? extends T> promise) { - this.promise = promise; - } - - @Override - public void run() { - Result<T> result = collect(promise); - tryResolve(result.value, result.fail); - } - } - - /** - * A callback used to resolve the chained Promise when the PromiseImpl is - * resolved. - * - * @Immutable - * @since 1.1 - */ - private final class ChainImpl implements Runnable { - private final PromiseImpl<T> promise; - - ChainImpl(PromiseImpl<T> promise) { - this.promise = promise; - } - - @Override - public void run() { - Result<T> result = promise.collect(); - tryResolve(result.value, result.fail); - } - } - - /** * {@inheritDoc} * * @since 1.1 */ @Override public Promise<T> thenAccept(Consumer< ? super T> consumer) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); onResolve(chained.new ThenAccept(this, consumer)); return chained; } - /** - * A callback used to resolve the chained Promise and call a Consumer when - * the PromiseImpl is resolved. - * - * @Immutable - * @since 1.1 - */ - private final class ThenAccept implements Runnable { - private final PromiseImpl<T> promise; - private final Consumer< ? super T> consumer; - - ThenAccept(PromiseImpl<T> promise, Consumer< ? super T> consumer) { - this.promise = promise; - this.consumer = requireNonNull(consumer); - } - - @Override - public void run() { - Result<T> result = promise.collect(); - if (result.fail == null) { - try { - consumer.accept(result.value); - } catch (Throwable e) { - result.fail = e; - } - } - tryResolve(result.value, result.fail); - } - } - - /** - * Resolve this Promise with the specified Promise. - * <p> - * If the specified Promise is successfully resolved, this Promise is - * resolved with the value of the specified Promise. If the specified - * Promise is resolved with a failure, this Promise is resolved with the - * failure of the specified Promise. - * - * @param with A Promise whose value or failure must be used to resolve this - * Promise. Must not be {@code null}. - * @return A Promise that is resolved only when this Promise is resolved by - * the specified Promise. The returned Promise must be successfully - * resolved with the value {@code null}, if this Promise was - * resolved by the specified Promise. The returned Promise must be - * resolved with a failure of {@link IllegalStateException}, if this - * Promise was already resolved when the specified Promise was - * resolved. - */ - Promise<Void> resolveWith(Promise<? extends T> with) { - PromiseImpl<Void> chained = new PromiseImpl<>(executors); - with.onResolve(chained.new ResolveWith<>(this, with)); - return chained; - } - - /** - * A callback used to resolve a Promise with another Promise for the - * {@link PromiseImpl#resolveWith(Promise)} method. - * - * @Immutable - */ - private final class ResolveWith<P> implements Runnable { - private final PromiseImpl<P> promise; - private final Promise< ? extends P> with; - - ResolveWith(PromiseImpl<P> promise, Promise< ? extends P> with) { - this.promise = promise; - this.with = requireNonNull(with); - } - - @Override - public void run() { - Throwable f = null; - Result<P> result = collect(with); - try { - promise.resolve(result.value, result.fail); - } catch (Throwable e) { - f = e; // propagate new exception - } - tryResolve(null, f); - } - } /** * {@inheritDoc} */ @Override public Promise<T> filter(Predicate<? super T> predicate) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); onResolve(chained.new Filter(this, predicate)); return chained; } /** - * A callback used by the {@link PromiseImpl#filter(Predicate)} method. - * - * @Immutable - */ - private final class Filter implements Runnable { - private final PromiseImpl<T> promise; - private final Predicate<? super T> predicate; - - Filter(PromiseImpl<T> promise, Predicate< ? super T> predicate) { - this.promise = promise; - this.predicate = requireNonNull(predicate); - } - - @Override - public void run() { - Result<T> result = promise.collect(); - if (result.fail == null) { - try { - if (!predicate.test(result.value)) { - result.fail = new NoSuchElementException(); - } - } catch (Throwable e) { // propagate new exception - result.fail = e; - } - } - tryResolve(result.value, result.fail); - } - } - - /** * {@inheritDoc} */ @Override public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { - PromiseImpl<R> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<R> chained = deferred(); onResolve(chained.new Map<>(this, mapper)); return chained; } - /** - * A callback used by the {@link PromiseImpl#map(Function)} method. - * - * @Immutable - */ - private final class Map<P> implements Runnable { - private final PromiseImpl<P> promise; - private final Function<? super P, ? extends T> mapper; - - Map(PromiseImpl<P> promise, - Function< ? super P, ? extends T> mapper) { - this.promise = promise; - this.mapper = requireNonNull(mapper); - } - - @Override - public void run() { - Result<P> result = promise.collect(); - T v = null; - if (result.fail == null) { - try { - v = mapper.apply(result.value); - } catch (Throwable e) { // propagate new exception - result.fail = e; - } - } - tryResolve(v, result.fail); - } - } /** * {@inheritDoc} */ @Override public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { - PromiseImpl<R> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<R> chained = deferred(); onResolve(chained.new FlatMap<>(this, mapper)); return chained; } /** - * A callback used by the {@link PromiseImpl#flatMap(Function)} method. - * - * @Immutable - */ - private final class FlatMap<P> implements Runnable { - private final PromiseImpl<P> promise; - private final Function< ? super P,Promise< ? extends T>> mapper; - - FlatMap(PromiseImpl<P> promise, - Function< ? super P,Promise< ? extends T>> mapper) { - this.promise = promise; - this.mapper = requireNonNull(mapper); - } - - @Override - public void run() { - Result<P> result = promise.collect(); - if (result.fail == null) { - Promise< ? extends T> flatmap = null; - try { - flatmap = mapper.apply(result.value); - } catch (Throwable e) { // propagate new exception - result.fail = e; - } - if (flatmap != null) { - flatmap.onResolve(new Chain(flatmap)); - return; - } - } - tryResolve(null, result.fail); - } - } - - /** * {@inheritDoc} */ @Override public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); onResolve(chained.new Recover(this, recovery)); return chained; } /** - * A callback used by the {@link PromiseImpl#recover(Function)} method. - * - * @Immutable - */ - private final class Recover implements Runnable { - private final PromiseImpl<T> promise; - private final Function<Promise< ? >, ? extends T> recovery; - - Recover(PromiseImpl<T> promise, - Function<Promise< ? >, ? extends T> recovery) { - this.promise = promise; - this.recovery = requireNonNull(recovery); - } - - @Override - public void run() { - Result<T> result = promise.collect(); - if (result.fail != null) { - try { - T v = recovery.apply(promise); - if (v != null) { - result.value = v; - result.fail = null; - } - } catch (Throwable e) { // propagate new exception - result.fail = e; - } - } - tryResolve(result.value, result.fail); - } - } - - /** * {@inheritDoc} */ @Override public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); onResolve(chained.new RecoverWith(this, recovery)); return chained; } /** - * A callback used by the {@link PromiseImpl#recoverWith(Function)} method. - * - * @Immutable - */ - private final class RecoverWith implements Runnable { - private final PromiseImpl<T> promise; - private final Function<Promise<?>, Promise<? extends T>> recovery; - - RecoverWith(PromiseImpl<T> promise, - Function<Promise< ? >,Promise< ? extends T>> recovery) { - this.promise = promise; - this.recovery = requireNonNull(recovery); - } - - @Override - public void run() { - Result<T> result = promise.collect(); - if (result.fail != null) { - Promise< ? extends T> recovered = null; - try { - recovered = recovery.apply(promise); - } catch (Throwable e) { // propagate new exception - result.fail = e; - } - if (recovered != null) { - recovered.onResolve(new Chain(recovered)); - return; - } - } - tryResolve(result.value, result.fail); - } - } - - /** * {@inheritDoc} */ @Override public Promise<T> fallbackTo(Promise<? extends T> fallback) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); onResolve(chained.new FallbackTo(this, fallback)); return chained; } /** - * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method. - * - * @Immutable - */ - private final class FallbackTo implements Runnable { - private final PromiseImpl<T> promise; - private final Promise<? extends T> fallback; - - FallbackTo(PromiseImpl<T> promise, Promise< ? extends T> fallback) { - this.promise = promise; - this.fallback = requireNonNull(fallback); - } - - @Override - public void run() { - Result<T> result = promise.collect(); - if (result.fail != null) { - fallback.onResolve(new FallbackChain(fallback, result.fail)); - return; - } - tryResolve(result.value, null); - } - } - - /** - * A callback used to resolve the chained Promise when the fallback Promise - * is resolved. - * - * @Immutable - * @since 1.1 - */ - private final class FallbackChain implements Runnable { - private final Promise< ? extends T> fallback; - private final Throwable failure; - - FallbackChain(Promise< ? extends T> fallback, Throwable failure) { - this.fallback = fallback; - this.failure = failure; - } - - @Override - public void run() { - Result<T> result = collect(fallback); - if (result.fail != null) { - result.fail = failure; - } - tryResolve(result.value, result.fail); - } - } - - /** - * Schedule a operation on the scheduled executor. - * - * @since 1.1 - */ - ScheduledFuture< ? > schedule(Runnable operation, long delay, - TimeUnit unit) { - try { - try { - return executors.scheduledExecutor().schedule(operation, delay, - unit); - } catch (RejectedExecutionException e) { - operation.run(); - } - } catch (Throwable t) { - uncaughtException(t); - } - return null; - } - - /** * {@inheritDoc} * * @since 1.1 */ @Override public Promise<T> timeout(long millis) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); + onResolve(chained.new ChainImpl(this)); if (!isDone()) { - PromiseImpl<T> timedout = new PromiseImpl<>(null, - new TimeoutException(), executors); + PromiseImpl<T> timedout = new FailedPromiseImpl<>( + new TimeoutException(), factory); onResolve(new Timeout(chained.new ChainImpl(timedout), millis, TimeUnit.MILLISECONDS)); } - onResolve(chained.new ChainImpl(this)); return chained; } @@ -866,7 +352,7 @@ final class PromiseImpl<T> implements Promise<T> { */ @Override public Promise<T> delay(long millis) { - PromiseImpl<T> chained = new PromiseImpl<>(executors); + DeferredPromiseImpl<T> chained = deferred(); onResolve(new Delay(chained.new ChainImpl(this), millis, TimeUnit.MILLISECONDS)); return chained; @@ -903,8 +389,8 @@ final class PromiseImpl<T> implements Promise<T> { * @since 1.1 */ static final class Result<P> { - Throwable fail; P value; + Throwable fail; Result(P value) { this.value = value; @@ -922,15 +408,7 @@ final class PromiseImpl<T> implements Promise<T> { * * @since 1.1 */ - Result<T> collect() { - if (!isDone()) { // ensure latch open before reading state - return new Result<T>(new AssertionError("promise not resolved")); - } - if (fail == null) { - return new Result<T>(value); - } - return new Result<T>(fail); - } + abstract Result<T> collect(); /** * Return a holder of the result of the specified Promise. diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java index a25bdf3c5..650a2b0e2 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/Promises.java @@ -16,20 +16,21 @@ package org.osgi.util.promise; -import static org.osgi.util.promise.PromiseExecutors.defaultExecutors; +import static org.osgi.util.promise.PromiseFactory.defaultFactory; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.osgi.util.promise.PromiseImpl.Result; /** * Static helper methods for {@link Promise}s. + * <p> + * These methods return Promises which use the default callback executor and + * default scheduled executor. See {@link PromiseFactory} for similar methods + * which use executors other than the default executors. * * @ThreadSafe + * @see PromiseFactory * @author $Id$ */ public class Promises { @@ -45,10 +46,10 @@ public class Promises { * @return A new Promise which uses the default callback executor and * default scheduled executor that has been resolved with the * specified value. - * @see PromiseExecutors#resolved(Object) + * @see PromiseFactory#resolved(Object) */ public static <T> Promise<T> resolved(T value) { - return defaultExecutors.resolved(value); + return defaultFactory.resolved(value); } /** @@ -60,10 +61,10 @@ public class Promises { * @return A new Promise which uses the default callback executor and * default scheduled executor that has been resolved with the * specified failure. - * @see PromiseExecutors#failed(Throwable) + * @see PromiseFactory#failed(Throwable) */ public static <T> Promise<T> failed(Throwable failure) { - return defaultExecutors.failed(failure); + return defaultFactory.failed(failure); } /** @@ -91,58 +92,11 @@ public class Promises { * specified Promises are resolved with a failure. The failure * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. - * @see #all(Deferred, Collection) - */ - public static <T, S extends T> Promise<List<T>> all(Collection<Promise<S>> promises) { - return all(defaultExecutors.<List<T>> deferred(), promises); - } - - /** - * Resolves the Promise returned by the specified Deferred when the - * specified Promises are all resolved. - * <p> - * The returned Promise acts as a gate and must be resolved after all of the - * specified Promises are resolved. - * - * @param <T> The value type of the List value associated with the returned - * Promise. - * @param <S> A subtype of the value type of the List value associated with - * the returned Promise. - * @param deferred The specified Deferred is used to obtain and resolve the - * returned Promise. Must not be {@code null} and must not have - * already resolved the returned Promise. - * @param promises The Promises which must be resolved before the returned - * Promise must be resolved. Must not be {@code null} and all of - * the elements in the collection must not be {@code null}. - * @return The Promise from the specified Deferred. It is resolved only when - * all the specified Promises are resolved. The returned Promise - * must be successfully resolved with a List of the values in the - * order of the specified Promises if all the specified Promises are - * successfully resolved. The List in the returned Promise is the - * property of the caller and is modifiable. The returned Promise - * must be resolved with a failure of - * {@link FailedPromisesException} if any of the specified Promises - * are resolved with a failure. The failure - * {@link FailedPromisesException} must contain all of the specified - * Promises which resolved with a failure. - * @since 1.1 - * @see PromiseExecutors#deferred() + * @see PromiseFactory#all(Collection) */ public static <T, S extends T> Promise<List<T>> all( - Deferred<List<T>> deferred, Collection<Promise<S>> promises) { - if (promises.isEmpty()) { - List<T> result = new ArrayList<>(); - deferred.resolve(result); - } else { - /* make a copy and capture the ordering */ - List<Promise< ? extends T>> list = new ArrayList<Promise< ? extends T>>( - promises); - All<T> all = new All<>(deferred, list); - for (Promise< ? extends T> promise : list) { - promise.onResolve(all); - } - } - return deferred.getPromise(); + Collection<Promise<S>> promises) { + return defaultFactory.all(promises); } /** @@ -167,95 +121,12 @@ public class Promises { * Promises are resolved with a failure. The failure * {@link FailedPromisesException} must contain all of the specified * Promises which resolved with a failure. - * @see #all(Deferred, Promise...) + * @see PromiseFactory#all(Collection) */ @SafeVarargs public static <T> Promise<List<T>> all(Promise<? extends T>... promises) { @SuppressWarnings("unchecked") List<Promise<T>> list = Arrays.asList((Promise<T>[]) promises); - return all(list); - } - - /** - * Resolves the Promise returned by the specified Deferred when the - * specified Promises are all resolved. - * <p> - * The new Promise acts as a gate and must be resolved after all of the - * specified Promises are resolved. - * - * @param <T> The value type associated with the specified Promises. - * @param deferred The specified Deferred is used to obtain and resolve the - * returned Promise. Must not be {@code null} and must not have - * already resolved the returned Promise. - * @param promises The Promises which must be resolved before the returned - * Promise must be resolved. Must not be {@code null} and all of - * the arguments must not be {@code null}. - * @return The Promise from the specified Deferred. It is resolved only when - * all the specified Promises are resolved. The returned Promise - * must be successfully resolved with a List of the values in the - * order of the specified Promises if all the specified Promises are - * successfully resolved. The List in the returned Promise is the - * property of the caller and is modifiable. The returned Promise - * must be resolved with a failure of - * {@link FailedPromisesException} if any of the specified Promises - * are resolved with a failure. The failure - * {@link FailedPromisesException} must contain all of the specified - * Promises which resolved with a failure. - * @since 1.1 - * @see PromiseExecutors#deferred() - */ - @SafeVarargs - public static <T> Promise<List<T>> all(Deferred<List<T>> deferred, - Promise< ? extends T>... promises) { - @SuppressWarnings("unchecked") - List<Promise<T>> list = Arrays.asList((Promise<T>[]) promises); - return all(deferred, list); - } - - /** - * A callback used to resolve a Deferred when the specified list of Promises - * are resolved for the {@link Promises#all(Collection)} method. - * - * @ThreadSafe - */ - private static final class All<T> implements Runnable { - private final Deferred<List<T>> chained; - private final List<Promise<? extends T>> promises; - private final AtomicInteger promiseCount; - - All(Deferred<List<T>> chained, List<Promise< ? extends T>> promises) { - if (chained.getPromise().isDone()) { - throw new IllegalStateException("Already resolved"); - } - this.chained = chained; - this.promises = promises; - this.promiseCount = new AtomicInteger(promises.size()); - } - - @Override - public void run() { - if (promiseCount.decrementAndGet() != 0) { - return; - } - List<T> value = new ArrayList<>(promises.size()); - List<Promise<?>> failed = new ArrayList<>(promises.size()); - Throwable cause = null; - for (Promise<? extends T> promise : promises) { - Result<T> result = PromiseImpl.collect(promise); - if (result.fail != null) { - failed.add(promise); - if (cause == null) { - cause = result.fail; - } - } else { - value.add(result.value); - } - } - if (failed.isEmpty()) { - chained.resolve(value); - } else { - chained.fail(new FailedPromisesException(failed, cause)); - } - } + return defaultFactory.all(list); } } diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/ResolvedPromiseImpl.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/ResolvedPromiseImpl.java new file mode 100644 index 000000000..d65146576 --- /dev/null +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/promise/ResolvedPromiseImpl.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.promise; + +/** + * Resolved Promise implementation. + * <p> + * This class is not used directly by clients. Clients should use + * {@link PromiseFactory#resolved(Object)} to create a resolved {@link Promise}. + * + * @param <T> The result type associated with the Promise. + * @since 1.1 + * @ThreadSafe + * @author $Id$ + */ +final class ResolvedPromiseImpl<T> extends PromiseImpl<T> { + /** + * The value of this resolved Promise. + */ + private final T value; + + /** + * Initialize this resolved Promise. + * + * @param value The value of this resolved Promise. + * @param factory The factory to use for callbacks and scheduled operations. + */ + ResolvedPromiseImpl(T value, PromiseFactory factory) { + super(factory); + this.value = value; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isDone() { + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public T getValue() { + return value; + } + + /** + * {@inheritDoc} + */ + @Override + public Throwable getFailure() { + return null; + } + + /** + * Return a holder of the result of this PromiseImpl. + */ + @Override + Result<T> collect() { + return new Result<T>(value); + } + + @Override + public String toString() { + return super.toString() + "[resolved: " + value + "]"; + } +} diff --git a/bundles/org.eclipse.osgi.util/src/org/osgi/util/xml/XMLParserActivator.java b/bundles/org.eclipse.osgi.util/src/org/osgi/util/xml/XMLParserActivator.java index 64be3f5cf..a7e84c4ac 100644 --- a/bundles/org.eclipse.osgi.util/src/org/osgi/util/xml/XMLParserActivator.java +++ b/bundles/org.eclipse.osgi.util/src/org/osgi/util/xml/XMLParserActivator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2002, 2015). All Rights Reserved. + * Copyright (c) OSGi Alliance (2002, 2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,9 +26,11 @@ import java.util.Collections; import java.util.Hashtable; import java.util.Iterator; import java.util.List; + import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.FactoryConfigurationError; import javax.xml.parsers.SAXParserFactory; + import org.osgi.framework.Bundle; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; @@ -421,7 +423,10 @@ public class XMLParserActivator implements BundleActivator, ServiceFactory<Objec */ private Object getFactory(String parserFactoryClassName) throws FactoryConfigurationError { try { - return bundleContext.getBundle().loadClass(parserFactoryClassName).newInstance(); + return bundleContext.getBundle() + .loadClass(parserFactoryClassName) + .getConstructor() + .newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { diff --git a/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/Logger.java b/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/Logger.java index 863d1404f..3424a62fd 100644 --- a/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/Logger.java +++ b/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/Logger.java @@ -106,11 +106,11 @@ public interface Logger { void trace(String format, Object... arguments); /** - * Call the specified function if logging enabled for the + * Perform the specified operation if logging enabled for the * {@link LogLevel#TRACE} level. * - * @param consumer The function to call passing this Logger. - * @throws E An exception thrown by the function. + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. */ <E extends Exception> void trace(LoggerConsumer<E> consumer) throws E; @@ -155,11 +155,11 @@ public interface Logger { void debug(String format, Object... arguments); /** - * Call the specified function if logging enabled for the + * Perform the specified operation if logging enabled for the * {@link LogLevel#DEBUG} level. * - * @param consumer The function to call passing this Logger. - * @throws E An exception thrown by the function. + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. */ <E extends Exception> void debug(LoggerConsumer<E> consumer) throws E; @@ -204,11 +204,11 @@ public interface Logger { void info(String format, Object... arguments); /** - * Call the specified function if logging enabled for the + * Perform the specified operation if logging enabled for the * {@link LogLevel#INFO} level. * - * @param consumer The function to call passing this Logger. - * @throws E An exception thrown by the function. + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. */ <E extends Exception> void info(LoggerConsumer<E> consumer) throws E; @@ -253,11 +253,11 @@ public interface Logger { void warn(String format, Object... arguments); /** - * Call the specified function if logging enabled for the + * Perform the specified operation if logging enabled for the * {@link LogLevel#WARN} level. * - * @param consumer The function to call passing this Logger. - * @throws E An exception thrown by the function. + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. */ <E extends Exception> void warn(LoggerConsumer<E> consumer) throws E; @@ -302,11 +302,11 @@ public interface Logger { void error(String format, Object... arguments); /** - * Call the specified function if logging enabled for the + * Perform the specified operation if logging enabled for the * {@link LogLevel#ERROR} level. * - * @param consumer The function to call passing this Logger. - * @throws E An exception thrown by the function. + * @param consumer The operation to perform on this Logger. + * @throws E An exception thrown by the operation. */ <E extends Exception> void error(LoggerConsumer<E> consumer) throws E; diff --git a/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/LoggerConsumer.java b/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/LoggerConsumer.java index 82201dc52..332fa18a8 100644 --- a/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/LoggerConsumer.java +++ b/bundles/org.eclipse.osgi/osgi/src/org/osgi/service/log/LoggerConsumer.java @@ -19,7 +19,7 @@ package org.osgi.service.log; import org.osgi.annotation.versioning.ConsumerType; /** - * A function that accepts a {@link Logger} argument and produces no result. + * An operation that accepts a {@link Logger} argument and produces no result. * <p> * This is a functional interface and can be used as the assignment target for a * lambda expression or method reference. @@ -33,10 +33,10 @@ import org.osgi.annotation.versioning.ConsumerType; @FunctionalInterface public interface LoggerConsumer<E extends Exception> { /** - * Applies this function to the specified {@link Logger}. + * Perform this operation on the specified {@link Logger}. * - * @param l The {@link Logger} input to this function. - * @throws E An exception thrown by the method. + * @param l The {@link Logger} input to this operation. + * @throws E An exception thrown by the operation. */ void accept(Logger l) throws E; } |