diff options
author | Thomas Watson | 2017-12-04 19:20:43 +0000 |
---|---|---|
committer | Thomas Watson | 2017-12-04 19:20:43 +0000 |
commit | 8d22dd6d2bf36a4f893e41f6e8485d088a27bd57 (patch) | |
tree | dd4969f7b73ee565566c988f23e1a43cd7258aa1 | |
parent | d79fee4fad1cc0a55e6587f5de6743d551aae696 (diff) | |
download | rt.equinox.bundles-I20171208-2000.tar.gz rt.equinox.bundles-I20171208-2000.tar.xz rt.equinox.bundles-I20171208-2000.zip |
Bug 528116 - [osgi R7] update the latest R7 OSGi APIS4_8_0_M4I20171215-0120I20171214-2000I20171214-0120I20171213-2000I20171212-2000I20171211-2000I20171210-2000I20171209-1500I20171209-1020I20171208-2000I20171206-2000I20171206-0800I20171205-2000I20171205-0800I20171205-0250I20171205-0030I20171204-2000
10 files changed, 180 insertions, 140 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java index ef8bb1420..cf16c2125 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java @@ -33,6 +33,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.Set; @@ -64,6 +65,7 @@ import org.osgi.util.function.Function; import org.osgi.util.function.Predicate; import org.osgi.util.promise.Deferred; import org.osgi.util.promise.Promise; +import org.osgi.util.promise.PromiseFactory; import org.osgi.util.promise.TimeoutException; import org.osgi.util.pushstream.PushEvent.EventType; @@ -77,7 +79,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { protected final PushStreamProvider psp; - protected final PushStreamExecutors executors; + protected final PromiseFactory promiseFactory; protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING); @@ -91,9 +93,9 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { protected abstract void upstreamClose(PushEvent< ? > close); AbstractPushStreamImpl(PushStreamProvider psp, - PushStreamExecutors executors) { + PromiseFactory promiseFactory) { this.psp = psp; - this.executors = executors; + this.promiseFactory = promiseFactory; } protected long handleEvent(PushEvent< ? extends T> event) { @@ -216,7 +218,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> filter(Predicate< ? super T> predicate) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); updateNext((event) -> { try { if (!event.isTerminal()) { @@ -239,7 +241,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) { AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); updateNext(event -> { try { if (!event.isTerminal()) { @@ -257,10 +259,62 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { } @Override + public <R> PushStream<R> asyncMap(int n, int delay, + Function< ? super T,Promise< ? extends R>> mapper) + throws IllegalArgumentException, NullPointerException { + + AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( + psp, promiseFactory, this); + Semaphore s = new Semaphore(n); + updateNext(event -> { + try { + if (event.isTerminal()) { + s.acquire(n); + eventStream.close(event.nodata()); + return ABORT; + } + + s.acquire(1); + + Promise< ? extends R> p = mapper.apply(event.getData()); + p.thenAccept(d -> promiseFactory.executor().execute(() -> { + try { + if (eventStream + .handleEvent(PushEvent.data(d)) < 0) { + PushEvent<R> close = PushEvent.close(); + eventStream.close(close); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(close); + } + } finally { + s.release(); + } + })).onFailure(t -> promiseFactory.executor().execute(() -> { + PushEvent<T> error = PushEvent.error(t); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); + })); + + // The number active before was one less than the active number + int activePromises = Math.max(0, n - s.availablePermits() - 1); + return (activePromises + s.getQueueLength()) * delay; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + + return eventStream; + } + + @Override public <R> PushStream<R> flatMap( Function< ? super T, ? extends PushStream< ? extends R>> mapper) { AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); PushEventConsumer<R> consumer = e -> { switch (e.getType()) { @@ -320,7 +374,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public PushStream<T> sorted(Comparator< ? super T> comparator) { List<T> list = Collections.synchronizedList(new ArrayList<>()); AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); updateNext(event -> { try { switch(event.getType()) { @@ -355,7 +409,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { throw new IllegalArgumentException("The limit must be greater than zero"); } AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); AtomicLong counter = new AtomicLong(maxSize); updateNext(event -> { try { @@ -381,11 +435,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> limit(Duration maxTime) { - Runnable start = () -> executors.schedule(() -> close(), + Runnable start = () -> promiseFactory.scheduledExecutor().schedule( + () -> close(), maxTime.toNanos(), NANOSECONDS); AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>( - psp, executors, this) { + psp, promiseFactory, this) { @Override protected void beginning() { start.run(); @@ -409,11 +464,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long timeout = maxTime.toNanos(); AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>( - psp, executors, this) { + psp, promiseFactory, this) { @Override protected void beginning() { lastTime.set(System.nanoTime()); - executors.schedule(() -> check(lastTime, timeout), timeout, + promiseFactory.scheduledExecutor().schedule( + () -> check(lastTime, timeout), timeout, NANOSECONDS); } }; @@ -434,7 +490,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long elapsed = now - lastTime.get(); if (elapsed < timeout) { - executors.schedule(() -> check(lastTime, timeout), + promiseFactory.scheduledExecutor().schedule( + () -> check(lastTime, timeout), timeout - elapsed, NANOSECONDS); } else { PushEvent<T> error = PushEvent.error(new TimeoutException()); @@ -451,7 +508,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { "The number to skip must be greater than or equal to zero"); } AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); AtomicLong counter = new AtomicLong(n); updateNext(event -> { try { @@ -475,7 +532,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> fork(int n, int delay, Executor ex) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, new PushStreamExecutors(ex, executors.scheduledExecutor()), + psp, new PromiseFactory(Objects.requireNonNull(ex), + promiseFactory.scheduledExecutor()), this); Semaphore s = new Semaphore(n); updateNext(event -> { @@ -538,7 +596,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public PushStream<T> merge( PushEventSource< ? extends T> source) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); AtomicInteger count = new AtomicInteger(2); PushEventConsumer<T> consumer = event -> { try { @@ -620,7 +678,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @SuppressWarnings("resource") AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>( - psp, executors) { + psp, promiseFactory) { @Override protected boolean begin() { if (closed.compareAndSet(BUILDING, STARTED)) { @@ -660,7 +718,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length); AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length]; for(int i = 0; i < tests.length; i++) { - rsult[i] = new IntermediatePushStreamImpl<>(psp, executors, this); + rsult[i] = new IntermediatePushStreamImpl<>(psp, promiseFactory, this); } Boolean[] array = new Boolean[tests.length]; @@ -716,7 +774,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> sequential() { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); Lock lock = new ReentrantLock(); updateNext((event) -> { try { @@ -738,7 +796,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> coalesce( Function< ? super T,Optional<R>> accumulator) { AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); updateNext((event) -> { try { if (!event.isTerminal()) { @@ -785,7 +843,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @SuppressWarnings("resource") AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>( - psp, executors, this) { + psp, promiseFactory, this) { @Override protected void beginning() { init.run(); @@ -857,7 +915,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public <R> PushStream<R> window(Duration time, Function<Collection<T>,R> f) { - return window(time, executors.executor(), f); + return window(time, promiseFactory.executor(), f); } @Override @@ -876,7 +934,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> window(Supplier<Duration> time, IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f) { - return window(time, maxEvents, executors.executor(), f); + return window(time, maxEvents, promiseFactory.executor(), f); } @Override @@ -902,7 +960,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long windowSize = time.get().toNanos(); previousWindowSize.set(windowSize); - executors.schedule( + promiseFactory.scheduledExecutor().schedule( getWindowTask(p, f, time, maxEvents, lock, count, queueRef, timestamp, counter, previousWindowSize, ex), @@ -914,7 +972,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @SuppressWarnings("resource") AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>( - psp, new PushStreamExecutors(ex, executors.scheduledExecutor()), + psp, new PromiseFactory(Objects.requireNonNull(ex), + promiseFactory.scheduledExecutor()), this) { @Override protected void beginning() { @@ -968,7 +1027,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long nextWindow = time.get().toNanos(); long backpressure = previousWindowSize.getAndSet(nextWindow) - elapsed; - executors.schedule( + promiseFactory.scheduledExecutor().schedule( getWindowTask(eventStream, f, time, maxEvents, lock, newCount, queueRef, timestamp, counter, previousWindowSize, ex), @@ -1194,7 +1253,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long nextWindow = time.get().toNanos(); previousWindowSize.set(nextWindow); queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt())); - executors.schedule( + promiseFactory.scheduledExecutor().schedule( getWindowTask(eventStream, f, time, maxEvents, lock, expectedCounter + 1, queueRef, timestamp, counter, previousWindowSize, executor), @@ -1226,7 +1285,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); updateNext(event -> { try { long bp = eventStream.handleEvent(event); @@ -1247,7 +1306,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public PushStream<T> adjustBackPressure( ToLongBiFunction<T,Long> adjustment) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, executors, this); + psp, promiseFactory, this); updateNext(event -> { try { long bp = eventStream.handleEvent(event); @@ -1268,7 +1327,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Void> forEach(Consumer< ? super T> action) { - Deferred<Void> d = executors.deferred(); + Deferred<Void> d = promiseFactory.deferred(); updateNext((event) -> { try { switch(event.getType()) { @@ -1307,7 +1366,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) { - Deferred<T> d = executors.deferred(); + Deferred<T> d = promiseFactory.deferred(); AtomicReference<T> iden = new AtomicReference<T>(identity); updateNext(event -> { @@ -1336,7 +1395,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) { - Deferred<Optional<T>> d = executors.deferred(); + Deferred<Optional<T>> d = promiseFactory.deferred(); AtomicReference<T> iden = new AtomicReference<T>(null); updateNext(event -> { @@ -1366,7 +1425,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { - Deferred<U> d = executors.deferred(); + Deferred<U> d = promiseFactory.deferred(); AtomicReference<U> iden = new AtomicReference<>(identity); updateNext(event -> { @@ -1397,7 +1456,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) { A result = collector.supplier().get(); BiConsumer<A, ? super T> accumulator = collector.accumulator(); - Deferred<R> d = executors.deferred(); + Deferred<R> d = promiseFactory.deferred(); PushEventConsumer<T> consumer; if (collector.characteristics() @@ -1464,7 +1523,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Long> count() { - Deferred<Long> d = executors.deferred(); + Deferred<Long> d = promiseFactory.deferred(); LongAdder counter = new LongAdder(); updateNext((event) -> { try { @@ -1510,7 +1569,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Optional<T>> findFirst() { - Deferred<Optional<T>> d = executors.deferred(); + Deferred<Optional<T>> d = promiseFactory.deferred(); updateNext((event) -> { try { Optional<T> o = null; @@ -1544,7 +1603,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) { - Deferred<Long> d = executors.deferred(); + Deferred<Long> d = promiseFactory.deferred(); LongAdder la = new LongAdder(); updateNext((event) -> { try { diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java index 7a8d1636e..dfb9eeeaa 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import org.osgi.util.promise.PromiseFactory; + class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> { @@ -46,11 +48,11 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> private final int parallelism; BufferedPushStreamImpl(PushStreamProvider psp, - PushStreamExecutors executors, U eventQueue, int parallelism, + PromiseFactory promiseFactory, U eventQueue, int parallelism, QueuePolicy<T,U> queuePolicy, PushbackPolicy<T,U> pushbackPolicy, Function<PushEventConsumer<T>,AutoCloseable> connector) { - super(psp, executors, connector); + super(psp, promiseFactory, connector); this.eventQueue = eventQueue; this.parallelism = parallelism; this.semaphore = new Semaphore(parallelism); @@ -85,7 +87,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> } private void startWorker() { - executors.execute(() -> { + promiseFactory.executor().execute(() -> { try { PushEvent< ? extends T> event; while ((event = eventQueue.poll()) != null) { @@ -99,7 +101,8 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> close(); return; } else if(backpressure > 0) { - executors.schedule(this::startWorker, backpressure, + promiseFactory.scheduledExecutor().schedule( + this::startWorker, backpressure, MILLISECONDS); return; } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java index 678751215..c939c4a6e 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java @@ -18,15 +18,17 @@ package org.osgi.util.pushstream; import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*; +import org.osgi.util.promise.PromiseFactory; + class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T> implements PushStream<T> { private final AbstractPushStreamImpl< ? > previous; IntermediatePushStreamImpl(PushStreamProvider psp, - PushStreamExecutors executors, + PromiseFactory promiseFactory, AbstractPushStreamImpl< ? > previous) { - super(psp, executors); + super(psp, promiseFactory); this.previous = previous; } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java index 57a3bb99a..574d65590 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java @@ -90,7 +90,7 @@ public abstract class PushEvent<T> { * @throws IllegalStateException if this event is not an * {@link EventType#ERROR} event. */ - public Exception getFailure() throws IllegalStateException { + public Throwable getFailure() throws IllegalStateException { throw new IllegalStateException( "Not an ERROR event, the event type is " + getType()); } @@ -119,11 +119,11 @@ public abstract class PushEvent<T> { * Create a new error event. * * @param <T> The payload type. - * @param e The error. + * @param t The error. * @return A new error event with the specified error. */ - public static <T> PushEvent<T> error(Exception e) { - return new ErrorEvent<T>(e); + public static <T> PushEvent<T> error(Throwable t) { + return new ErrorEvent<T>(t); } /** @@ -182,14 +182,14 @@ public abstract class PushEvent<T> { } static final class ErrorEvent<T> extends PushEvent<T> { - private final Exception error; + private final Throwable error; - ErrorEvent(Exception error) { + ErrorEvent(Throwable error) { this.error = error; } @Override - public Exception getFailure() { + public Throwable getFailure() { return error; } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java index 6d5953e34..071c9ec7d 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java @@ -104,6 +104,26 @@ public interface PushStream<T> extends AutoCloseable { <R> PushStream<R> map(Function< ? super T, ? extends R> mapper); /** + * Asynchronously map the payload values. The mapping function returns a + * Promise representing the asynchronous mapping operation. + * <p> + * The PushStream limits the number of concurrently running mapping + * operations, and returns back pressure based on the number of existing + * queued operations. + * + * @param n number of simultaneous promises to use + * @param delay Nr of ms/promise that is queued back pressure + * @param mapper The mapping function + * @return Builder style (can be a new or the same object) + * @throws IllegalArgumentException if the number of threads is < 1 or + * the delay is < 0 + * @throws NullPointerException if the mapper is null + */ + <R> PushStream<R> asyncMap(int n, int delay, + Function< ? super T,Promise< ? extends R>> mapper) + throws IllegalArgumentException, NullPointerException; + + /** * Flat map the payload value (turn one event into 0..n events of * potentially another type). * diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java deleted file mode 100644 index 7c2509eef..000000000 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) OSGi Alliance (2017). All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.osgi.util.pushstream; - -import static java.util.Objects.requireNonNull; - -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.osgi.util.promise.PromiseExecutors; - -class PushStreamExecutors extends PromiseExecutors { - PushStreamExecutors(Executor executor, ScheduledExecutorService scheduler) { - super(requireNonNull(executor), requireNonNull(scheduler)); - } - - void execute(Runnable operation) { - executor().execute(operation); - } - - ScheduledFuture< ? > schedule(Runnable operation, long delay, - TimeUnit unit) { - return scheduledExecutor().schedule(operation, delay, unit); - } - - @Override - protected Executor executor() { - return super.executor(); - } - - @Override - protected ScheduledExecutorService scheduledExecutor() { - return super.scheduledExecutor(); - } -} diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java index c7d861bf3..ecd8bf4c4 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java @@ -23,6 +23,7 @@ import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR; import static org.osgi.util.pushstream.QueuePolicyOption.FAIL; import java.util.Iterator; +import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -36,6 +37,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; +import org.osgi.util.promise.PromiseFactory; + /** * A factory for {@link PushStream} instances, and utility methods for handling * {@link PushEventSource}s and {@link PushEventConsumer}s @@ -164,7 +167,7 @@ public final class PushStreamProvider { workerToUse = Executors.newFixedThreadPool(parallelism); closeExecutorOnClose = true; } else { - workerToUse = executor; + workerToUse = Objects.requireNonNull(executor); closeExecutorOnClose = false; } @@ -174,7 +177,7 @@ public final class PushStreamProvider { timerToUse = acquireScheduler(); releaseSchedulerOnClose = true; } else { - timerToUse = scheduler; + timerToUse = Objects.requireNonNull(scheduler); releaseSchedulerOnClose = false; } @@ -191,7 +194,7 @@ public final class PushStreamProvider { } PushStream<T> stream = new BufferedPushStreamImpl<>(this, - new PushStreamExecutors(workerToUse, timerToUse), queue, + new PromiseFactory(workerToUse, timerToUse), queue, parallelism, queuePolicy, pushbackPolicy, aec -> { try { @@ -231,7 +234,7 @@ public final class PushStreamProvider { workerToUse = Executors.newFixedThreadPool(2); closeExecutorOnClose = true; } else { - workerToUse = executor; + workerToUse = Objects.requireNonNull(executor); closeExecutorOnClose = false; } @@ -241,11 +244,11 @@ public final class PushStreamProvider { timerToUse = acquireScheduler(); releaseSchedulerOnClose = true; } else { - timerToUse = scheduler; + timerToUse = Objects.requireNonNull(scheduler); releaseSchedulerOnClose = false; } PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, - new PushStreamExecutors(workerToUse, timerToUse), + new PromiseFactory(workerToUse, timerToUse), aec -> { try { return eventSource.open(aec); @@ -533,7 +536,7 @@ public final class PushStreamProvider { toUse = Executors.newFixedThreadPool(parallelism); closeExecutorOnClose = true; } else { - toUse = executor; + toUse = Objects.requireNonNull(executor); closeExecutorOnClose = false; } @@ -546,7 +549,7 @@ public final class PushStreamProvider { } SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>( - new PushStreamExecutors(toUse, acquireScheduler()), queuePolicy, + new PromiseFactory(toUse, acquireScheduler()), queuePolicy, queue, parallelism, () -> { try { @@ -712,7 +715,7 @@ public final class PushStreamProvider { workerToUse = Executors.newFixedThreadPool(2); closeExecutorOnClose = true; } else { - workerToUse = executor; + workerToUse = Objects.requireNonNull(executor); closeExecutorOnClose = false; } @@ -722,12 +725,12 @@ public final class PushStreamProvider { timerToUse = acquireScheduler(); releaseSchedulerOnClose = true; } else { - timerToUse = scheduler; + timerToUse = Objects.requireNonNull(scheduler); releaseSchedulerOnClose = false; } PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>( - this, new PushStreamExecutors(workerToUse, timerToUse), aec -> { + this, new PromiseFactory(workerToUse, timerToUse), aec -> { return () -> { /* No action to take */ }; }) { @@ -736,7 +739,7 @@ public final class PushStreamProvider { if (super.begin()) { Iterator<T> it = items.iterator(); - executors.execute(() -> pushData(it)); + promiseFactory.executor().execute(() -> pushData(it)); return true; } @@ -753,8 +756,9 @@ public final class PushStreamProvider { close(); return; } else { - executors.schedule( - () -> executors + promiseFactory.scheduledExecutor() + .schedule( + () -> promiseFactory.executor() .execute(() -> pushData(it)), returnValue, MILLISECONDS); return; diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java index f47bcb1e7..314ae0830 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java @@ -76,9 +76,9 @@ public interface SimplePushEventSource<T> * {@link #open(PushEventConsumer)} this source, and will receive subsequent * events. * - * @param e the error + * @param t the error */ - void error(Exception e); + void error(Throwable t); /** * Determine whether there are any {@link PushEventConsumer}s for this diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java index 094a580ab..478d0e4a3 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java @@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore; import org.osgi.util.promise.Deferred; import org.osgi.util.promise.Promise; -import org.osgi.util.promise.Promises; +import org.osgi.util.promise.PromiseFactory; class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> implements SimplePushEventSource<T> { private final Object lock = new Object(); - private final PushStreamExecutors executors; - private final PushStreamExecutors sameThread; + private final PromiseFactory promiseFactory; + private final PromiseFactory sameThread; private final QueuePolicy<T,U> queuePolicy; @@ -57,13 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends private boolean waitForFinishes; - public SimplePushEventSourceImpl(PushStreamExecutors executors, + public SimplePushEventSourceImpl(PromiseFactory promiseFactory, QueuePolicy<T,U> queuePolicy, U queue, int parallelism, Runnable onClose) { - this.executors = executors; - this.sameThread = new PushStreamExecutors( - PushStreamExecutors.inlineExecutor(), - executors.scheduledExecutor()); + this.promiseFactory = promiseFactory; + this.sameThread = new PromiseFactory( + PromiseFactory.inlineExecutor(), + promiseFactory.scheduledExecutor()); this.queuePolicy = queuePolicy; this.queue = queue; this.parallelism = parallelism; @@ -111,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) { try { - executors.execute(() -> safePush(pec, event)); + promiseFactory.executor().execute(() -> safePush(pec, event)); } catch (RejectedExecutionException ree) { // TODO log? if (!event.isTerminal()) { @@ -126,7 +126,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends PushEventConsumer< ? super T> pec, PushEvent<T> event) { Deferred<Long> d = sameThread.deferred(); try { - executors.execute( + promiseFactory.executor().execute( () -> d.resolve(Long.valueOf( System.nanoTime() + safePush(pec, event)))); } catch (RejectedExecutionException ree) { @@ -206,8 +206,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends } @Override - public void error(Exception e) { - enqueueEvent(PushEvent.error(e)); + public void error(Throwable t) { + enqueueEvent(PushEvent.error(t)); } private void enqueueEvent(PushEvent<T> event) { @@ -237,7 +237,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends "unchecked", "boxing" }) private void startWorker() { - executors.execute(() -> { + promiseFactory.executor().execute(() -> { try { for(;;) { @@ -287,7 +287,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends - System.nanoTime(); if (toWait > 0) { - executors.schedule(this::startWorker, toWait, + promiseFactory.scheduledExecutor().schedule( + this::startWorker, toWait, NANOSECONDS); return; } @@ -297,14 +298,15 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends long toWait = p.getValue() - System.nanoTime(); if (toWait > 0) { - executors.schedule(this::startWorker, toWait, + promiseFactory.scheduledExecutor().schedule( + this::startWorker, toWait, NANOSECONDS); } else { startWorker(); } return p; }, p -> close( - PushEvent.error((Exception) p.getFailure()))); + PushEvent.error(p.getFailure()))); return; } } @@ -346,8 +348,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends return doCall(event, pec); } }).collect(toList()); - return Promises - .all(sameThread.deferred(), calls) + return sameThread.all(calls) .map(l -> l.stream().max(Long::compareTo).orElseGet( () -> Long.valueOf(System.nanoTime()))); } @@ -375,17 +376,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends if (connected.isEmpty()) { if (connectPromise == null) { - connectPromise = executors.deferred(); + connectPromise = promiseFactory.deferred(); } return connectPromise.getPromise(); } else { - return executors.resolved(null); + return promiseFactory.resolved(null); } } } private Promise<Void> closedConnectPromise() { - return executors.failed(new IllegalStateException( + return promiseFactory.failed(new IllegalStateException( "This SimplePushEventSource is closed")); } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java index 53400c4c2..eb3e93350 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java @@ -23,6 +23,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.osgi.util.promise.PromiseFactory; + class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> extends AbstractPushStreamImpl<T> implements PushStream<T> { @@ -31,9 +33,9 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>(); UnbufferedPushStreamImpl(PushStreamProvider psp, - PushStreamExecutors executors, + PromiseFactory promiseFactory, Function<PushEventConsumer<T>,AutoCloseable> connector) { - super(psp, executors); + super(psp, promiseFactory); this.connector = connector; } |