diff options
author | Thomas Watson | 2017-11-13 18:53:17 +0000 |
---|---|---|
committer | Thomas Watson | 2017-11-13 19:03:55 +0000 |
commit | b2a818de0268db42a685fcbdb413407f1e4f0232 (patch) | |
tree | 9695ba715ae24afe5e4bf91e5217a9f6b142ae71 | |
parent | c51bc3e5f7a5378d3a315d48a96128d110d6a1da (diff) | |
download | rt.equinox.bundles-b2a818de0268db42a685fcbdb413407f1e4f0232.tar.gz rt.equinox.bundles-b2a818de0268db42a685fcbdb413407f1e4f0232.tar.xz rt.equinox.bundles-b2a818de0268db42a685fcbdb413407f1e4f0232.zip |
Bug 527221 - [osgi R7] update log stream impl and APII20171113-2000
Change-Id: Ic50dfd9847d8a694c3c49dbe88dded6862d44f3f
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
14 files changed, 750 insertions, 201 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java index a37e407fd..749efc1a3 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java @@ -1,16 +1,34 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>> implements BufferBuilder<R,T,U> { - protected Executor worker; - protected int concurrency; - protected PushbackPolicy<T,U> backPressure; - protected QueuePolicy<T,U> bufferingPolicy; - protected U buffer; + protected Executor worker; + protected ScheduledExecutorService timer; + protected int concurrency; + protected PushbackPolicy<T,U> backPressure; + protected QueuePolicy<T,U> bufferingPolicy; + protected U buffer; @Override public BufferBuilder<R,T,U> withBuffer(U queue) { @@ -57,4 +75,11 @@ abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? this.worker = executor; return this; } + + @Override + public BufferBuilder<R,T,U> withScheduler( + ScheduledExecutorService scheduler) { + this.timer = scheduler; + return this; + } } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java index d64ac129d..ef8bb1420 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import static java.util.Collections.emptyList; @@ -23,7 +39,6 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +54,9 @@ import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.function.IntSupplier; +import java.util.function.LongUnaryOperator; import java.util.function.Supplier; +import java.util.function.ToLongBiFunction; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -60,8 +77,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { protected final PushStreamProvider psp; - protected final Executor defaultExecutor; - protected final ScheduledExecutorService scheduler; + protected final PushStreamExecutors executors; protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING); @@ -75,10 +91,9 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { protected abstract void upstreamClose(PushEvent< ? > close); AbstractPushStreamImpl(PushStreamProvider psp, - Executor executor, ScheduledExecutorService scheduler) { + PushStreamExecutors executors) { this.psp = psp; - this.defaultExecutor = executor; - this.scheduler = scheduler; + this.executors = executors; } protected long handleEvent(PushEvent< ? extends T> event) { @@ -201,7 +216,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> filter(Predicate< ? super T> predicate) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); updateNext((event) -> { try { if (!event.isTerminal()) { @@ -224,7 +239,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) { AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); updateNext(event -> { try { if (!event.isTerminal()) { @@ -245,7 +260,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> flatMap( Function< ? super T, ? extends PushStream< ? extends R>> mapper) { AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); PushEventConsumer<R> consumer = e -> { switch (e.getType()) { @@ -305,7 +320,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public PushStream<T> sorted(Comparator< ? super T> comparator) { List<T> list = Collections.synchronizedList(new ArrayList<>()); AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); updateNext(event -> { try { switch(event.getType()) { @@ -340,7 +355,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { throw new IllegalArgumentException("The limit must be greater than zero"); } AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); AtomicLong counter = new AtomicLong(maxSize); updateNext(event -> { try { @@ -366,11 +381,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> limit(Duration maxTime) { - Runnable start = () -> scheduler.schedule(() -> close(), + Runnable start = () -> executors.schedule(() -> close(), maxTime.toNanos(), NANOSECONDS); AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>( - psp, defaultExecutor, scheduler, this) { + psp, executors, this) { @Override protected void beginning() { start.run(); @@ -394,11 +409,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long timeout = maxTime.toNanos(); AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>( - psp, defaultExecutor, scheduler, this) { + psp, executors, this) { @Override protected void beginning() { lastTime.set(System.nanoTime()); - scheduler.schedule(() -> check(lastTime, timeout), timeout, + executors.schedule(() -> check(lastTime, timeout), timeout, NANOSECONDS); } }; @@ -419,7 +434,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long elapsed = now - lastTime.get(); if (elapsed < timeout) { - scheduler.schedule(() -> check(lastTime, timeout), + executors.schedule(() -> check(lastTime, timeout), timeout - elapsed, NANOSECONDS); } else { PushEvent<T> error = PushEvent.error(new TimeoutException()); @@ -436,7 +451,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { "The number to skip must be greater than or equal to zero"); } AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); AtomicLong counter = new AtomicLong(n); updateNext(event -> { try { @@ -460,7 +475,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> fork(int n, int delay, Executor ex) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, ex, scheduler, this); + psp, new PushStreamExecutors(ex, executors.scheduledExecutor()), + this); Semaphore s = new Semaphore(n); updateNext(event -> { try { @@ -522,7 +538,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public PushStream<T> merge( PushEventSource< ? extends T> source) { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); AtomicInteger count = new AtomicInteger(2); PushEventConsumer<T> consumer = event -> { try { @@ -604,7 +620,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @SuppressWarnings("resource") AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>( - psp, defaultExecutor, scheduler) { + psp, executors) { @Override protected boolean begin() { if (closed.compareAndSet(BUILDING, STARTED)) { @@ -644,8 +660,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length); AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length]; for(int i = 0; i < tests.length; i++) { - rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor, - scheduler, this); + rsult[i] = new IntermediatePushStreamImpl<>(psp, executors, this); } Boolean[] array = new Boolean[tests.length]; @@ -701,7 +716,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public PushStream<T> sequential() { AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); Lock lock = new ReentrantLock(); updateNext((event) -> { try { @@ -723,7 +738,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> coalesce( Function< ? super T,Optional<R>> accumulator) { AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( - psp, defaultExecutor, scheduler, this); + psp, executors, this); updateNext((event) -> { try { if (!event.isTerminal()) { @@ -770,7 +785,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @SuppressWarnings("resource") AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>( - psp, defaultExecutor, scheduler, this) { + psp, executors, this) { @Override protected void beginning() { init.run(); @@ -842,7 +857,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public <R> PushStream<R> window(Duration time, Function<Collection<T>,R> f) { - return window(time, defaultExecutor, f); + return window(time, executors.executor(), f); } @Override @@ -861,7 +876,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R> PushStream<R> window(Supplier<Duration> time, IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f) { - return window(time, maxEvents, defaultExecutor, f); + return window(time, maxEvents, executors.executor(), f); } @Override @@ -887,7 +902,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long windowSize = time.get().toNanos(); previousWindowSize.set(windowSize); - scheduler.schedule( + executors.schedule( getWindowTask(p, f, time, maxEvents, lock, count, queueRef, timestamp, counter, previousWindowSize, ex), @@ -899,7 +914,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @SuppressWarnings("resource") AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>( - psp, ex, scheduler, this) { + psp, new PushStreamExecutors(ex, executors.scheduledExecutor()), + this) { @Override protected void beginning() { begin.accept(this); @@ -952,7 +968,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long nextWindow = time.get().toNanos(); long backpressure = previousWindowSize.getAndSet(nextWindow) - elapsed; - scheduler.schedule( + executors.schedule( getWindowTask(eventStream, f, time, maxEvents, lock, newCount, queueRef, timestamp, counter, previousWindowSize, ex), @@ -1178,7 +1194,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { long nextWindow = time.get().toNanos(); previousWindowSize.set(nextWindow); queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt())); - scheduler.schedule( + executors.schedule( getWindowTask(eventStream, f, time, maxEvents, lock, expectedCounter + 1, queueRef, timestamp, counter, previousWindowSize, executor), @@ -1208,8 +1224,51 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { } @Override + public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment) { + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, executors, this); + updateNext(event -> { + try { + long bp = eventStream.handleEvent(event); + if (event.isTerminal()) { + return ABORT; + } else { + return bp < 0 ? bp : adjustment.applyAsLong(bp); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public PushStream<T> adjustBackPressure( + ToLongBiFunction<T,Long> adjustment) { + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, executors, this); + updateNext(event -> { + try { + long bp = eventStream.handleEvent(event); + if (event.isTerminal()) { + return ABORT; + } else { + return bp < 0 ? bp + : adjustment.applyAsLong(event.getData(), + Long.valueOf(bp)); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override public Promise<Void> forEach(Consumer< ? super T> action) { - Deferred<Void> d = new Deferred<>(); + Deferred<Void> d = executors.deferred(); updateNext((event) -> { try { switch(event.getType()) { @@ -1223,10 +1282,10 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { d.fail(event.getFailure()); break; } - close(event.nodata()); + close(event.nodata()); return ABORT; } catch (Exception e) { - d.fail(e); + close(PushEvent.error(e)); return ABORT; } }); @@ -1248,7 +1307,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) { - Deferred<T> d = new Deferred<>(); + Deferred<T> d = executors.deferred(); AtomicReference<T> iden = new AtomicReference<T>(identity); updateNext(event -> { @@ -1277,7 +1336,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) { - Deferred<Optional<T>> d = new Deferred<>(); + Deferred<Optional<T>> d = executors.deferred(); AtomicReference<T> iden = new AtomicReference<T>(null); updateNext(event -> { @@ -1307,7 +1366,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { - Deferred<U> d = new Deferred<>(); + Deferred<U> d = executors.deferred(); AtomicReference<U> iden = new AtomicReference<>(identity); updateNext(event -> { @@ -1338,7 +1397,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) { A result = collector.supplier().get(); BiConsumer<A, ? super T> accumulator = collector.accumulator(); - Deferred<R> d = new Deferred<>(); + Deferred<R> d = executors.deferred(); PushEventConsumer<T> consumer; if (collector.characteristics() @@ -1405,7 +1464,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Long> count() { - Deferred<Long> d = new Deferred<>(); + Deferred<Long> d = executors.deferred(); LongAdder counter = new LongAdder(); updateNext((event) -> { try { @@ -1451,7 +1510,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Optional<T>> findFirst() { - Deferred<Optional<T>> d = new Deferred<>(); + Deferred<Optional<T>> d = executors.deferred(); updateNext((event) -> { try { Optional<T> o = null; @@ -1485,7 +1544,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) { - Deferred<Long> d = new Deferred<>(); + Deferred<Long> d = executors.deferred(); LongAdder la = new LongAdder(); updateNext((event) -> { try { @@ -1497,15 +1556,17 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { case CLOSE: try { action.accept(event); - } finally { d.resolve(Long.valueOf(la.sum())); + } catch (Exception e) { + d.fail(e); } break; case ERROR: try { action.accept(event); - } finally { d.fail(event.getFailure()); + } catch (Exception e) { + d.fail(e); } break; } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java index dfcf8b503..826c730ea 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java @@ -1,7 +1,24 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import org.osgi.annotation.versioning.ProviderType; @@ -73,6 +90,15 @@ public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends * @return this builder */ BufferBuilder<R, T, U> withExecutor(Executor executor); + + /** + * Set the {@link ScheduledExecutorService} that should be used to trigger + * timed events after this buffer + * + * @param scheduler + * @return this builder + */ + BufferBuilder<R,T,U> withScheduler(ScheduledExecutorService scheduler); /** * @return the object being built diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java index 7cedafb5c..7a8d1636e 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -5,8 +21,6 @@ import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED; import static org.osgi.util.pushstream.PushEventConsumer.ABORT; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -18,8 +32,6 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> private final Semaphore semaphore; - private final Executor worker; - private final QueuePolicy<T, U> queuePolicy; private final PushbackPolicy<T, U> pushbackPolicy; @@ -34,15 +46,14 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> private final int parallelism; BufferedPushStreamImpl(PushStreamProvider psp, - ScheduledExecutorService scheduler, U eventQueue, - int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy, + PushStreamExecutors executors, U eventQueue, int parallelism, + QueuePolicy<T,U> queuePolicy, PushbackPolicy<T,U> pushbackPolicy, Function<PushEventConsumer<T>,AutoCloseable> connector) { - super(psp, worker, scheduler, connector); + super(psp, executors, connector); this.eventQueue = eventQueue; this.parallelism = parallelism; this.semaphore = new Semaphore(parallelism); - this.worker = worker; this.queuePolicy = queuePolicy; this.pushbackPolicy = pushbackPolicy; } @@ -74,7 +85,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> } private void startWorker() { - worker.execute(() -> { + executors.execute(() -> { try { PushEvent< ? extends T> event; while ((event = eventQueue.poll()) != null) { @@ -88,12 +99,12 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> close(); return; } else if(backpressure > 0) { - scheduler.schedule(this::startWorker, backpressure, + executors.schedule(this::startWorker, backpressure, MILLISECONDS); return; } } - + // Only release this now the queue is empty semaphore.release(); } catch (Exception e) { close(PushEvent.error(e)); diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java index 320adeebc..678751215 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java @@ -1,19 +1,32 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; - class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T> implements PushStream<T> { private final AbstractPushStreamImpl< ? > previous; IntermediatePushStreamImpl(PushStreamProvider psp, - Executor executor, ScheduledExecutorService scheduler, + PushStreamExecutors executors, AbstractPushStreamImpl< ? > previous) { - super(psp, executor, scheduler); + super(psp, executors); this.previous = previous; } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java index 968f668f6..57a3bb99a 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java index 1471a84f9..6d5953e34 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,9 @@ import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.function.IntSupplier; +import java.util.function.LongUnaryOperator; import java.util.function.Supplier; +import java.util.function.ToLongBiFunction; import java.util.stream.Collector; import org.osgi.annotation.versioning.ProviderType; @@ -187,8 +189,8 @@ public interface PushStream<T> extends AutoCloseable { * @param delay Nr of ms/thread that is queued back pressure * @param e an executor to use for the background threads. * @return Builder style (can be a new or the same object) - * @throws IllegalArgumentException if the number of threads is < 1 or the - * delay is < 0 + * @throws IllegalArgumentException if the number of threads is < 1 or + * the delay is < 0 * @throws NullPointerException if the Executor is null */ PushStream<T> fork(int n, int delay, Executor e) @@ -430,6 +432,33 @@ public interface PushStream<T> extends AutoCloseable { BiFunction<Long,Collection<T>,R> f); /** + * Changes the back-pressure propagated by this pipeline stage. + * <p> + * The supplied function receives the back pressure returned by the next + * pipeline stage and returns the back pressure that should be returned by + * this stage. This function will not be called if the previous pipeline + * stage returns negative back pressure. + * + * @param adjustment + * @return Builder style (can be a new or the same object) + */ + PushStream<T> adjustBackPressure(LongUnaryOperator adjustment); + + /** + * Changes the back-pressure propagated by this pipeline stage. + * <p> + * The supplied function receives the data object passed to the next + * pipeline stage and the back pressure that was returned by that stage when + * accepting it. The function returns the back pressure that should be + * returned by this stage. This function will not be called if the previous + * pipeline stage returns negative back pressure. + * + * @param adjustment + * @return Builder style (can be a new or the same object) + */ + PushStream<T> adjustBackPressure(ToLongBiFunction<T,Long> adjustment); + + /** * Execute the action for each event received until the channel is closed. * This is a terminating method, the returned promise is resolved when the * channel closes. diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java index 506c8f2e7..d773deddf 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java @@ -1,7 +1,24 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import org.osgi.annotation.versioning.ProviderType; @@ -20,7 +37,18 @@ public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? exten /** * Tells this {@link PushStreamBuilder} to create an unbuffered stream which * delivers events directly to its consumer using the incoming delivery - * thread. + * thread. Setting the {@link PushStreamBuilder} to be unbuffered means that + * any buffer, queue policy or push back policy will be ignored. Note that + * calling one of: + * <ul> + * <li>{@link #withBuffer(BlockingQueue)}</li> + * <li>{@link #withQueuePolicy(QueuePolicy)}</li> + * <li>{@link #withQueuePolicy(QueuePolicyOption)}</li> + * <li>{@link #withPushbackPolicy(PushbackPolicy)}</li> + * <li>{@link #withPushbackPolicy(PushbackPolicyOption, long)}</li> + * <li>{@link #withParallelism(int)}</li> + * </ul> + * after this method will reset this builder to require a buffer. * * @return the builder */ @@ -52,4 +80,7 @@ public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? exten @Override PushStreamBuilder<T,U> withExecutor(Executor executor); + + @Override + PushStreamBuilder<T,U> withScheduler(ScheduledExecutorService scheduler); } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java index 0bf8d7127..75fbf07a5 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java @@ -1,22 +1,41 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> extends AbstractBufferBuilder<PushStream<T>,T,U> implements PushStreamBuilder<T,U> { - private final PushStreamProvider psp; + private final PushStreamProvider psp; private final PushEventSource<T> eventSource; private final Executor previousExecutor; + private final ScheduledExecutorService previousScheduler; private boolean unbuffered; PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor, - PushEventSource<T> eventSource) { + ScheduledExecutorService defaultScheduler, PushEventSource<T> eventSource) { this.psp = psp; this.previousExecutor = defaultExecutor; + this.previousScheduler = defaultScheduler; this.eventSource = eventSource; this.worker = defaultExecutor; } @@ -66,11 +85,16 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> @Override public PushStreamBuilder<T,U> withExecutor(Executor executor) { - unbuffered = false; return (PushStreamBuilder<T,U>) super.withExecutor(executor); } @Override + public PushStreamBuilder<T,U> withScheduler( + ScheduledExecutorService scheduler) { + return (PushStreamBuilder<T,U>) super.withScheduler(scheduler); + } + + @Override public PushStreamBuilder<T,U> unbuffered() { unbuffered = true; return this; @@ -78,10 +102,16 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> @Override public PushStream<T> build() { + Executor workerToUse = worker == null ? previousExecutor : worker; + ScheduledExecutorService timerToUse = timer == null ? previousScheduler + : timer; + if (unbuffered) { - return psp.createUnbufferedStream(eventSource, previousExecutor); + return psp.createUnbufferedStream(eventSource, workerToUse, + timerToUse); } else { - return psp.createStream(eventSource, concurrency, worker, buffer, + return psp.createStream(eventSource, concurrency, workerToUse, + timerToUse, buffer, bufferingPolicy, backPressure); } } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java new file mode 100644 index 000000000..7c2509eef --- /dev/null +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) OSGi Alliance (2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.osgi.util.promise.PromiseExecutors; + +class PushStreamExecutors extends PromiseExecutors { + PushStreamExecutors(Executor executor, ScheduledExecutorService scheduler) { + super(requireNonNull(executor), requireNonNull(scheduler)); + } + + void execute(Runnable operation) { + executor().execute(operation); + } + + ScheduledFuture< ? > schedule(Runnable operation, long delay, + TimeUnit unit) { + return scheduledExecutor().schedule(operation, delay, unit); + } + + @Override + protected Executor executor() { + return super.executor(); + } + + @Override + protected ScheduledExecutorService scheduledExecutor() { + return super.scheduledExecutor(); + } +} diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java index f63c66210..c7d861bf3 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,11 +25,13 @@ import static org.osgi.util.pushstream.QueuePolicyOption.FAIL; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; @@ -44,7 +46,7 @@ public final class PushStreamProvider { private int schedulerReferences; - private ScheduledExecutorService scheduler; + private ScheduledExecutorService sharedScheduler; private ScheduledExecutorService acquireScheduler() { try { @@ -53,9 +55,10 @@ public final class PushStreamProvider { schedulerReferences += 1; if (schedulerReferences == 1) { - scheduler = Executors.newSingleThreadScheduledExecutor(); + sharedScheduler = Executors + .newSingleThreadScheduledExecutor(); } - return scheduler; + return sharedScheduler; } finally { lock.unlock(); } @@ -72,8 +75,8 @@ public final class PushStreamProvider { schedulerReferences -= 1; if (schedulerReferences == 0) { - scheduler.shutdown(); - scheduler = null; + sharedScheduler.shutdown(); + sharedScheduler = null; } } finally { lock.unlock(); @@ -112,7 +115,8 @@ public final class PushStreamProvider { * @return A {@link PushStream} with a default initial buffer */ public <T> PushStream<T> createStream(PushEventSource<T> eventSource) { - return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32), + return createStream(eventSource, 1, null, null, + new ArrayBlockingQueue<>(32), FAIL.getPolicy(), LINEAR.getPolicy(1000)); } @@ -130,7 +134,7 @@ public final class PushStreamProvider { */ public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream( PushEventSource<T> eventSource) { - return new PushStreamBuilderImpl<T,U>(this, null, eventSource); + return new PushStreamBuilderImpl<T,U>(this, null, null, eventSource); } @SuppressWarnings({ @@ -138,7 +142,8 @@ public final class PushStreamProvider { }) <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream( PushEventSource<T> eventSource, int parallelism, Executor executor, - U queue, QueuePolicy<T,U> queuePolicy, + ScheduledExecutorService scheduler, U queue, + QueuePolicy<T,U> queuePolicy, PushbackPolicy<T,U> pushbackPolicy) { if (eventSource == null) { @@ -154,15 +159,25 @@ public final class PushStreamProvider { } boolean closeExecutorOnClose; - Executor toUse; + Executor workerToUse; if (executor == null) { - toUse = Executors.newFixedThreadPool(parallelism); + workerToUse = Executors.newFixedThreadPool(parallelism); closeExecutorOnClose = true; } else { - toUse = executor; + workerToUse = executor; closeExecutorOnClose = false; } + boolean releaseSchedulerOnClose; + ScheduledExecutorService timerToUse; + if (scheduler == null) { + timerToUse = acquireScheduler(); + releaseSchedulerOnClose = true; + } else { + timerToUse = scheduler; + releaseSchedulerOnClose = false; + } + if (queue == null) { queue = (U) new ArrayBlockingQueue(32); } @@ -175,9 +190,9 @@ public final class PushStreamProvider { pushbackPolicy = LINEAR.getPolicy(1000); } - @SuppressWarnings("resource") PushStream<T> stream = new BufferedPushStreamImpl<>(this, - acquireScheduler(), queue, parallelism, toUse, queuePolicy, + new PushStreamExecutors(workerToUse, timerToUse), queue, + parallelism, queuePolicy, pushbackPolicy, aec -> { try { return eventSource.open(aec); @@ -187,31 +202,51 @@ public final class PushStreamProvider { } }); - stream = stream.onClose(() -> { - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }).map(x -> x); + return cleanupThreads(closeExecutorOnClose, workerToUse, + releaseSchedulerOnClose, stream); + } + + private <T> PushStream<T> cleanupThreads(boolean closeExecutorOnClose, + Executor workerToUse, boolean releaseSchedulerOnClose, + PushStream<T> stream) { + if (closeExecutorOnClose || releaseSchedulerOnClose) { + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) workerToUse).shutdown(); + } + if (releaseSchedulerOnClose) { + releaseScheduler(); + } + }).map(x -> x); + } return stream; } <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource, - Executor executor) { + Executor executor, ScheduledExecutorService scheduler) { boolean closeExecutorOnClose; - Executor toUse; + Executor workerToUse; if (executor == null) { - toUse = Executors.newFixedThreadPool(2); + workerToUse = Executors.newFixedThreadPool(2); closeExecutorOnClose = true; } else { - toUse = executor; + workerToUse = executor; closeExecutorOnClose = false; } - @SuppressWarnings("resource") - PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse, - acquireScheduler(), aec -> { + boolean releaseSchedulerOnClose; + ScheduledExecutorService timerToUse; + if (scheduler == null) { + timerToUse = acquireScheduler(); + releaseSchedulerOnClose = true; + } else { + timerToUse = scheduler; + releaseSchedulerOnClose = false; + } + PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, + new PushStreamExecutors(workerToUse, timerToUse), + aec -> { try { return eventSource.open(aec); } catch (Exception e) { @@ -220,14 +255,8 @@ public final class PushStreamProvider { } }); - stream = stream.onClose(() -> { - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }).map(x -> x); - - return stream; + return cleanupThreads(closeExecutorOnClose, workerToUse, + releaseSchedulerOnClose, stream); } /** @@ -256,9 +285,17 @@ public final class PushStreamProvider { * call to {@link PushEventSource#open(PushEventConsumer)} will begin event * processing. * + * <p> * The {@link PushEventSource} will remain active until the backing stream * is closed, and permits multiple consumers to - * {@link PushEventSource#open(PushEventConsumer)} it. + * {@link PushEventSource#open(PushEventConsumer)} it. Note that this means + * the caller of this method is responsible for closing the supplied + * stream if it is not finite in length. + * + * <p>Late joining + * consumers will not receive historical events, but will immediately + * receive the terminal event which closed the stream if the stream is + * already closed. * * @param stream * @@ -266,26 +303,174 @@ public final class PushStreamProvider { */ public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream( PushStream<T> stream) { - return new AbstractBufferBuilder<PushEventSource<T>,T,U>() { + BufferBuilder<PushStream<T>,T,U> builder = stream.buildBuffer(); + + return new BufferBuilder<PushEventSource<T>,T,U>() { + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withBuffer(U queue) { + builder.withBuffer(queue); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withQueuePolicy( + QueuePolicy<T,U> queuePolicy) { + builder.withQueuePolicy(queuePolicy); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withQueuePolicy( + QueuePolicyOption queuePolicyOption) { + builder.withQueuePolicy(queuePolicyOption); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy) { + builder.withPushbackPolicy(pushbackPolicy); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time) { + builder.withPushbackPolicy(pushbackPolicyOption, time); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withParallelism( + int parallelism) { + builder.withParallelism(parallelism); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withExecutor( + Executor executor) { + builder.withExecutor(executor); + return this; + } + + @Override + public BufferBuilder<PushEventSource<T>,T,U> withScheduler( + ScheduledExecutorService scheduler) { + builder.withScheduler(scheduler); + return this; + } + @Override public PushEventSource<T> build() { - SimplePushEventSource<T> spes = createSimplePushEventSource( - concurrency, worker, buffer, bufferingPolicy, () -> { + + AtomicBoolean connect = new AtomicBoolean(); + AtomicReference<PushEvent<T>> terminalEvent = new AtomicReference<>(); + + CopyOnWriteArrayList<PushEventConsumer< ? super T>> consumers = new CopyOnWriteArrayList<>(); + + return consumer -> { + + consumers.add(consumer); + + PushEvent<T> terminal = terminalEvent.get(); + if (terminal != null) { + if (consumers.remove(consumer)) { + // The stream is already done and we missed it + consumer.accept(terminal); + } + return () -> { + //Nothing to do, we have already sent the terminal event + }; + } + + if(!connect.getAndSet(true)) { + // connect + builder.build() + .forEachEvent(new MultiplexingConsumer<T>( + terminalEvent, consumers)); + } + + return () -> { + if (consumers.remove(consumer)) { try { - stream.close(); - } catch (Exception e) { + consumer.accept(PushEvent.close()); + } catch (Exception ex) { // TODO Auto-generated catch block - e.printStackTrace(); + ex.printStackTrace(); } - }); - spes.connectPromise() - .then(p -> stream.forEach(t -> spes.publish(t)) - .onResolve(() -> spes.close())); - return spes; + } + }; + }; } }; } + private static class MultiplexingConsumer<T> implements PushEventConsumer<T> { + + private final AtomicReference<PushEvent<T>> terminalEventStore; + + private final CopyOnWriteArrayList<PushEventConsumer<? super T>> consumers; + + public MultiplexingConsumer( + AtomicReference<PushEvent<T>> terminalEventStore, + CopyOnWriteArrayList<PushEventConsumer< ? super T>> consumers) { + super(); + this.terminalEventStore = terminalEventStore; + this.consumers = consumers; + } + + @Override + public long accept(PushEvent< ? extends T> event) throws Exception { + boolean isTerminal = event.isTerminal(); + if(isTerminal) { + if(!terminalEventStore.compareAndSet(null, event.nodata())) { + // We got a duplicate terminal, silently ignore it + return -1; + } + for (PushEventConsumer< ? super T> pushEventConsumer : consumers) { + if(consumers.remove(pushEventConsumer)) { + try { + pushEventConsumer.accept(event); + } catch (Exception ex) { + // TODO Auto-generated catch block + ex.printStackTrace(); + } + } + } + return -1; + } else { + long maxBP = 0; + for (PushEventConsumer< ? super T> pushEventConsumer : consumers) { + try { + long tmpBP = pushEventConsumer.accept(event); + + if(tmpBP < 0 && consumers.remove(pushEventConsumer)) { + try { + pushEventConsumer.accept(PushEvent.close()); + } catch (Exception ex) { + // TODO Auto-generated catch block + ex.printStackTrace(); + } + } else if (tmpBP > maxBP) { + maxBP = tmpBP; + } + } catch (Exception ex) { + if(consumers.remove(pushEventConsumer)) { + try { + pushEventConsumer.accept(PushEvent.error(ex)); + } catch (Exception ex2) { + // TODO Auto-generated catch block + ex2.printStackTrace(); + } + } + } + } + return maxBP; + } + } + } /** * Create a {@link SimplePushEventSource} with the supplied type and default @@ -345,7 +530,7 @@ public final class PushStreamProvider { boolean closeExecutorOnClose; Executor toUse; if (executor == null) { - toUse = Executors.newFixedThreadPool(2); + toUse = Executors.newFixedThreadPool(parallelism); closeExecutorOnClose = true; } else { toUse = executor; @@ -361,7 +546,8 @@ public final class PushStreamProvider { } SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>( - toUse, acquireScheduler(), queuePolicy, queue, parallelism, + new PushStreamExecutors(toUse, acquireScheduler()), queuePolicy, + queue, parallelism, () -> { try { onClose.run(); @@ -437,7 +623,8 @@ public final class PushStreamProvider { public PushEventConsumer<T> build() { PushEventPipe<T> pipe = new PushEventPipe<>(); - createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure) + createStream(pipe, concurrency, worker, timer, buffer, + bufferingPolicy, backPressure) .forEachEvent(delegate); return pipe; @@ -501,7 +688,7 @@ public final class PushStreamProvider { return () -> closed.set(true); }; - return this.<T> createUnbufferedStream(pes, null); + return this.<T> createUnbufferedStream(pes, null, null); } /** @@ -511,24 +698,36 @@ public final class PushStreamProvider { * * @param executor The worker to use to push items from the Stream into the * PushStream + * @param scheduler The scheduler to use to trigger timed events in the + * PushStream * @param items The items to push into the PushStream * @return A PushStream containing the items from the Java Stream */ - public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) { + public <T> PushStream<T> streamOf(Executor executor, + ScheduledExecutorService scheduler, Stream<T> items) { boolean closeExecutorOnClose; - Executor toUse; + Executor workerToUse; if (executor == null) { - toUse = Executors.newFixedThreadPool(2); + workerToUse = Executors.newFixedThreadPool(2); closeExecutorOnClose = true; } else { - toUse = executor; + workerToUse = executor; closeExecutorOnClose = false; } - @SuppressWarnings("resource") + boolean releaseSchedulerOnClose; + ScheduledExecutorService timerToUse; + if (scheduler == null) { + timerToUse = acquireScheduler(); + releaseSchedulerOnClose = true; + } else { + timerToUse = scheduler; + releaseSchedulerOnClose = false; + } + PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>( - this, toUse, acquireScheduler(), aec -> { + this, new PushStreamExecutors(workerToUse, timerToUse), aec -> { return () -> { /* No action to take */ }; }) { @@ -537,7 +736,7 @@ public final class PushStreamProvider { if (super.begin()) { Iterator<T> it = items.iterator(); - toUse.execute(() -> pushData(it)); + executors.execute(() -> pushData(it)); return true; } @@ -554,8 +753,9 @@ public final class PushStreamProvider { close(); return; } else { - scheduler.schedule( - () -> toUse.execute(() -> pushData(it)), + executors.schedule( + () -> executors + .execute(() -> pushData(it)), returnValue, MILLISECONDS); return; } @@ -568,13 +768,7 @@ public final class PushStreamProvider { } }; - stream = stream.onClose(() -> { - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }).map(x -> x); - - return stream; + return cleanupThreads(closeExecutorOnClose, workerToUse, + releaseSchedulerOnClose, stream); } } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java index 747b4530d..f47bcb1e7 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java @@ -34,7 +34,8 @@ public interface SimplePushEventSource<T> * any more events published by it. Calling this method sends a close event * to all connected consumers. After calling this method any * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)} - * this source will immediately receive a close event. + * this source will immediately receive a close event, and will not see any + * remaining buffered events. */ @Override void close(); @@ -53,7 +54,11 @@ public interface SimplePushEventSource<T> /** * Close this source for now, but potentially reopen it later. Calling this - * method asynchronously sends a close event to all connected consumers. + * method asynchronously sends a close event to all connected consumers and + * then disconnects them. Any events previously queued by the + * {@link #publish(Object)} method will be delivered before this close + * event. + * <p> * After calling this method any {@link PushEventConsumer} that wishes may * {@link #open(PushEventConsumer)} this source, and will receive subsequent * events. @@ -62,7 +67,11 @@ public interface SimplePushEventSource<T> /** * Close this source for now, but potentially reopen it later. Calling this - * method asynchronously sends an error event to all connected consumers. + * method asynchronously sends an error event to all connected consumers and + * then disconnects them. Any events previously queued by the + * {@link #publish(Object)} method will be delivered before this error + * event. + * <p> * After calling this method any {@link PushEventConsumer} that wishes may * {@link #open(PushEventConsumer)} this source, and will receive subsequent * events. diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java index e31c9bf59..094a580ab 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import static java.util.Collections.emptyList; @@ -7,9 +23,7 @@ import static java.util.stream.Collectors.toList; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import org.osgi.util.promise.Deferred; @@ -18,12 +32,11 @@ import org.osgi.util.promise.Promises; class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> implements SimplePushEventSource<T> { - + private final Object lock = new Object(); - private final Executor worker; - - private final ScheduledExecutorService scheduler; + private final PushStreamExecutors executors; + private final PushStreamExecutors sameThread; private final QueuePolicy<T,U> queuePolicy; @@ -44,11 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends private boolean waitForFinishes; - public SimplePushEventSourceImpl(Executor worker, - ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy, + public SimplePushEventSourceImpl(PushStreamExecutors executors, + QueuePolicy<T,U> queuePolicy, U queue, int parallelism, Runnable onClose) { - this.worker = worker; - this.scheduler = scheduler; + this.executors = executors; + this.sameThread = new PushStreamExecutors( + PushStreamExecutors.inlineExecutor(), + executors.scheduledExecutor()); this.queuePolicy = queuePolicy; this.queue = queue; this.parallelism = parallelism; @@ -96,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) { try { - worker.execute(() -> safePush(pec, event)); + executors.execute(() -> safePush(pec, event)); } catch (RejectedExecutionException ree) { // TODO log? if (!event.isTerminal()) { @@ -107,21 +122,22 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends } } - @SuppressWarnings("boxing") private Promise<Long> doSendWithBackPressure( PushEventConsumer< ? super T> pec, PushEvent<T> event) { - Deferred<Long> d = new Deferred<>(); + Deferred<Long> d = sameThread.deferred(); try { - worker.execute( - () -> d.resolve(System.nanoTime() + safePush(pec, event))); + executors.execute( + () -> d.resolve(Long.valueOf( + System.nanoTime() + safePush(pec, event)))); } catch (RejectedExecutionException ree) { // TODO log? + if (!event.isTerminal()) { close(PushEvent.error(ree)); - return Promises.resolved(System.nanoTime()); + d.resolve(Long.valueOf(System.nanoTime())); } else { - return Promises - .resolved(System.nanoTime() + safePush(pec, event)); + d.resolve( + Long.valueOf(System.nanoTime() + safePush(pec, event))); } } return d.getPromise(); @@ -135,7 +151,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends closeConsumer(pec, PushEvent.close()); return -1; } - return backpressure; + return event.isTerminal() ? -1 : backpressure; } catch (Exception e) { // TODO log? if (!event.isTerminal()) { @@ -221,13 +237,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends "unchecked", "boxing" }) private void startWorker() { - worker.execute(() -> { + executors.execute(() -> { try { for(;;) { PushEvent<T> event; List<PushEventConsumer< ? super T>> toCall; - boolean resetWait = false; + boolean resetWait; synchronized (lock) { if(waitForFinishes) { semaphore.release(); @@ -244,6 +260,11 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends break; } + if (connected.isEmpty()) { + queue.clear(); + break; + } + toCall = new ArrayList<>(connected); if (event.isTerminal()) { waitForFinishes = true; @@ -252,40 +273,39 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends while (!semaphore.tryAcquire(parallelism - 1)) { lock.wait(); } - } - } - - List<Promise<Long>> calls = toCall.stream().map(pec -> { - if (semaphore.tryAcquire()) { - try { - return doSendWithBackPressure(pec, event); - } finally { - semaphore.release(); - } } else { - return Promises.resolved( - System.nanoTime() + safePush(pec, event)); + resetWait = false; } - }).collect(toList()); - - long toWait = Promises.all(calls) - .map(l -> l.stream() - .max(Long::compareTo) - .orElseGet(() -> System.nanoTime())) - .getValue() - System.nanoTime(); + } + Promise<Long> backPressure = deliver(toCall, event); - if (toWait > 0) { - scheduler.schedule(this::startWorker, toWait, - NANOSECONDS); - return; - } + if (backPressure.isDone()) { + handleReset(resetWait); - if (resetWait == true) { - synchronized (lock) { - waitForFinishes = false; - lock.notifyAll(); + long toWait = backPressure.getValue() + - System.nanoTime(); + + if (toWait > 0) { + executors.schedule(this::startWorker, toWait, + NANOSECONDS); + return; } + } else { + backPressure.then(p -> { + handleReset(resetWait); + long toWait = p.getValue() - System.nanoTime(); + + if (toWait > 0) { + executors.schedule(this::startWorker, toWait, + NANOSECONDS); + } else { + startWorker(); + } + return p; + }, p -> close( + PushEvent.error((Exception) p.getFailure()))); + return; } } @@ -304,6 +324,41 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends } + private void handleReset(boolean resetWait) { + if (resetWait == true) { + synchronized (lock) { + waitForFinishes = false; + lock.notifyAll(); + } + } + } + + private Promise<Long> deliver(List<PushEventConsumer< ? super T>> toCall, + PushEvent<T> event) { + if (toCall.size() == 1) { + return doCall(event, toCall.get(0)); + } else { + List<Promise<Long>> calls = toCall.stream().map(pec -> { + if (semaphore.tryAcquire()) { + return doSendWithBackPressure(pec, event) + .onResolve(() -> semaphore.release()); + } else { + return doCall(event, pec); + } + }).collect(toList()); + return Promises + .all(sameThread.deferred(), calls) + .map(l -> l.stream().max(Long::compareTo).orElseGet( + () -> Long.valueOf(System.nanoTime()))); + } + } + + private Promise<Long> doCall(PushEvent<T> event, + PushEventConsumer< ? super T> pec) { + return sameThread.resolved( + Long.valueOf(System.nanoTime() + safePush(pec, event))); + } + @Override public boolean isConnected() { synchronized (lock) { @@ -320,17 +375,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends if (connected.isEmpty()) { if (connectPromise == null) { - connectPromise = new Deferred<>(); + connectPromise = executors.deferred(); } return connectPromise.getPromise(); } else { - return Promises.resolved(null); + return executors.resolved(null); } } } private Promise<Void> closedConnectPromise() { - return Promises.failed(new IllegalStateException( + return executors.failed(new IllegalStateException( "This SimplePushEventSource is closed")); } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java index d6116e34e..53400c4c2 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java @@ -1,11 +1,25 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import static java.util.Optional.ofNullable; import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -17,9 +31,9 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>(); UnbufferedPushStreamImpl(PushStreamProvider psp, - Executor executor, ScheduledExecutorService scheduler, + PushStreamExecutors executors, Function<PushEventConsumer<T>,AutoCloseable> connector) { - super(psp, executor, scheduler); + super(psp, executors); this.connector = connector; } |