Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Watson2017-11-13 18:53:17 +0000
committerThomas Watson2017-11-13 19:03:55 +0000
commitb2a818de0268db42a685fcbdb413407f1e4f0232 (patch)
tree9695ba715ae24afe5e4bf91e5217a9f6b142ae71
parentc51bc3e5f7a5378d3a315d48a96128d110d6a1da (diff)
downloadrt.equinox.bundles-I20171113-2000.tar.gz
rt.equinox.bundles-I20171113-2000.tar.xz
rt.equinox.bundles-I20171113-2000.zip
Bug 527221 - [osgi R7] update log stream impl and APII20171113-2000
Change-Id: Ic50dfd9847d8a694c3c49dbe88dded6862d44f3f Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java35
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java147
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java26
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java33
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java23
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java2
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java35
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java33
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java40
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java51
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java330
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java15
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java159
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java22
14 files changed, 750 insertions, 201 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
index a37e407fd..749efc1a3 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
@@ -1,16 +1,34 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>>
implements BufferBuilder<R,T,U> {
- protected Executor worker;
- protected int concurrency;
- protected PushbackPolicy<T,U> backPressure;
- protected QueuePolicy<T,U> bufferingPolicy;
- protected U buffer;
+ protected Executor worker;
+ protected ScheduledExecutorService timer;
+ protected int concurrency;
+ protected PushbackPolicy<T,U> backPressure;
+ protected QueuePolicy<T,U> bufferingPolicy;
+ protected U buffer;
@Override
public BufferBuilder<R,T,U> withBuffer(U queue) {
@@ -57,4 +75,11 @@ abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ?
this.worker = executor;
return this;
}
+
+ @Override
+ public BufferBuilder<R,T,U> withScheduler(
+ ScheduledExecutorService scheduler) {
+ this.timer = scheduler;
+ return this;
+ }
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index d64ac129d..ef8bb1420 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import static java.util.Collections.emptyList;
@@ -23,7 +39,6 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,7 +54,9 @@ import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
+import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
+import java.util.function.ToLongBiFunction;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@@ -60,8 +77,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected final PushStreamProvider psp;
- protected final Executor defaultExecutor;
- protected final ScheduledExecutorService scheduler;
+ protected final PushStreamExecutors executors;
protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
@@ -75,10 +91,9 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected abstract void upstreamClose(PushEvent< ? > close);
AbstractPushStreamImpl(PushStreamProvider psp,
- Executor executor, ScheduledExecutorService scheduler) {
+ PushStreamExecutors executors) {
this.psp = psp;
- this.defaultExecutor = executor;
- this.scheduler = scheduler;
+ this.executors = executors;
}
protected long handleEvent(PushEvent< ? extends T> event) {
@@ -201,7 +216,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> filter(Predicate< ? super T> predicate) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
updateNext((event) -> {
try {
if (!event.isTerminal()) {
@@ -224,7 +239,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
updateNext(event -> {
try {
if (!event.isTerminal()) {
@@ -245,7 +260,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> flatMap(
Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
PushEventConsumer<R> consumer = e -> {
switch (e.getType()) {
@@ -305,7 +320,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> sorted(Comparator< ? super T> comparator) {
List<T> list = Collections.synchronizedList(new ArrayList<>());
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
updateNext(event -> {
try {
switch(event.getType()) {
@@ -340,7 +355,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
throw new IllegalArgumentException("The limit must be greater than zero");
}
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
AtomicLong counter = new AtomicLong(maxSize);
updateNext(event -> {
try {
@@ -366,11 +381,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> limit(Duration maxTime) {
- Runnable start = () -> scheduler.schedule(() -> close(),
+ Runnable start = () -> executors.schedule(() -> close(),
maxTime.toNanos(), NANOSECONDS);
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, defaultExecutor, scheduler, this) {
+ psp, executors, this) {
@Override
protected void beginning() {
start.run();
@@ -394,11 +409,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long timeout = maxTime.toNanos();
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, defaultExecutor, scheduler, this) {
+ psp, executors, this) {
@Override
protected void beginning() {
lastTime.set(System.nanoTime());
- scheduler.schedule(() -> check(lastTime, timeout), timeout,
+ executors.schedule(() -> check(lastTime, timeout), timeout,
NANOSECONDS);
}
};
@@ -419,7 +434,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long elapsed = now - lastTime.get();
if (elapsed < timeout) {
- scheduler.schedule(() -> check(lastTime, timeout),
+ executors.schedule(() -> check(lastTime, timeout),
timeout - elapsed, NANOSECONDS);
} else {
PushEvent<T> error = PushEvent.error(new TimeoutException());
@@ -436,7 +451,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
"The number to skip must be greater than or equal to zero");
}
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
AtomicLong counter = new AtomicLong(n);
updateNext(event -> {
try {
@@ -460,7 +475,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> fork(int n, int delay, Executor ex) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, ex, scheduler, this);
+ psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+ this);
Semaphore s = new Semaphore(n);
updateNext(event -> {
try {
@@ -522,7 +538,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> merge(
PushEventSource< ? extends T> source) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
AtomicInteger count = new AtomicInteger(2);
PushEventConsumer<T> consumer = event -> {
try {
@@ -604,7 +620,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
- psp, defaultExecutor, scheduler) {
+ psp, executors) {
@Override
protected boolean begin() {
if (closed.compareAndSet(BUILDING, STARTED)) {
@@ -644,8 +660,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
for(int i = 0; i < tests.length; i++) {
- rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor,
- scheduler, this);
+ rsult[i] = new IntermediatePushStreamImpl<>(psp, executors, this);
}
Boolean[] array = new Boolean[tests.length];
@@ -701,7 +716,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> sequential() {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
Lock lock = new ReentrantLock();
updateNext((event) -> {
try {
@@ -723,7 +738,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> coalesce(
Function< ? super T,Optional<R>> accumulator) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
+ psp, executors, this);
updateNext((event) -> {
try {
if (!event.isTerminal()) {
@@ -770,7 +785,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, defaultExecutor, scheduler, this) {
+ psp, executors, this) {
@Override
protected void beginning() {
init.run();
@@ -842,7 +857,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <R> PushStream<R> window(Duration time,
Function<Collection<T>,R> f) {
- return window(time, defaultExecutor, f);
+ return window(time, executors.executor(), f);
}
@Override
@@ -861,7 +876,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> window(Supplier<Duration> time,
IntSupplier maxEvents,
BiFunction<Long,Collection<T>,R> f) {
- return window(time, maxEvents, defaultExecutor, f);
+ return window(time, maxEvents, executors.executor(), f);
}
@Override
@@ -887,7 +902,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long windowSize = time.get().toNanos();
previousWindowSize.set(windowSize);
- scheduler.schedule(
+ executors.schedule(
getWindowTask(p, f, time, maxEvents, lock, count,
queueRef, timestamp, counter,
previousWindowSize, ex),
@@ -899,7 +914,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, ex, scheduler, this) {
+ psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+ this) {
@Override
protected void beginning() {
begin.accept(this);
@@ -952,7 +968,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long nextWindow = time.get().toNanos();
long backpressure = previousWindowSize.getAndSet(nextWindow)
- elapsed;
- scheduler.schedule(
+ executors.schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
newCount, queueRef, timestamp, counter,
previousWindowSize, ex),
@@ -1178,7 +1194,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long nextWindow = time.get().toNanos();
previousWindowSize.set(nextWindow);
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
- scheduler.schedule(
+ executors.schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
expectedCounter + 1, queueRef, timestamp, counter,
previousWindowSize, executor),
@@ -1208,8 +1224,51 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
}
@Override
+ public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment) {
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, executors, this);
+ updateNext(event -> {
+ try {
+ long bp = eventStream.handleEvent(event);
+ if (event.isTerminal()) {
+ return ABORT;
+ } else {
+ return bp < 0 ? bp : adjustment.applyAsLong(bp);
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> adjustBackPressure(
+ ToLongBiFunction<T,Long> adjustment) {
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, executors, this);
+ updateNext(event -> {
+ try {
+ long bp = eventStream.handleEvent(event);
+ if (event.isTerminal()) {
+ return ABORT;
+ } else {
+ return bp < 0 ? bp
+ : adjustment.applyAsLong(event.getData(),
+ Long.valueOf(bp));
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
public Promise<Void> forEach(Consumer< ? super T> action) {
- Deferred<Void> d = new Deferred<>();
+ Deferred<Void> d = executors.deferred();
updateNext((event) -> {
try {
switch(event.getType()) {
@@ -1223,10 +1282,10 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
d.fail(event.getFailure());
break;
}
- close(event.nodata());
+ close(event.nodata());
return ABORT;
} catch (Exception e) {
- d.fail(e);
+ close(PushEvent.error(e));
return ABORT;
}
});
@@ -1248,7 +1307,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
- Deferred<T> d = new Deferred<>();
+ Deferred<T> d = executors.deferred();
AtomicReference<T> iden = new AtomicReference<T>(identity);
updateNext(event -> {
@@ -1277,7 +1336,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
- Deferred<Optional<T>> d = new Deferred<>();
+ Deferred<Optional<T>> d = executors.deferred();
AtomicReference<T> iden = new AtomicReference<T>(null);
updateNext(event -> {
@@ -1307,7 +1366,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
- Deferred<U> d = new Deferred<>();
+ Deferred<U> d = executors.deferred();
AtomicReference<U> iden = new AtomicReference<>(identity);
updateNext(event -> {
@@ -1338,7 +1397,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
A result = collector.supplier().get();
BiConsumer<A, ? super T> accumulator = collector.accumulator();
- Deferred<R> d = new Deferred<>();
+ Deferred<R> d = executors.deferred();
PushEventConsumer<T> consumer;
if (collector.characteristics()
@@ -1405,7 +1464,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Long> count() {
- Deferred<Long> d = new Deferred<>();
+ Deferred<Long> d = executors.deferred();
LongAdder counter = new LongAdder();
updateNext((event) -> {
try {
@@ -1451,7 +1510,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Optional<T>> findFirst() {
- Deferred<Optional<T>> d = new Deferred<>();
+ Deferred<Optional<T>> d = executors.deferred();
updateNext((event) -> {
try {
Optional<T> o = null;
@@ -1485,7 +1544,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
- Deferred<Long> d = new Deferred<>();
+ Deferred<Long> d = executors.deferred();
LongAdder la = new LongAdder();
updateNext((event) -> {
try {
@@ -1497,15 +1556,17 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
case CLOSE:
try {
action.accept(event);
- } finally {
d.resolve(Long.valueOf(la.sum()));
+ } catch (Exception e) {
+ d.fail(e);
}
break;
case ERROR:
try {
action.accept(event);
- } finally {
d.fail(event.getFailure());
+ } catch (Exception e) {
+ d.fail(e);
}
break;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
index dfcf8b503..826c730ea 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
@@ -1,7 +1,24 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import org.osgi.annotation.versioning.ProviderType;
@@ -73,6 +90,15 @@ public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends
* @return this builder
*/
BufferBuilder<R, T, U> withExecutor(Executor executor);
+
+ /**
+ * Set the {@link ScheduledExecutorService} that should be used to trigger
+ * timed events after this buffer
+ *
+ * @param scheduler
+ * @return this builder
+ */
+ BufferBuilder<R,T,U> withScheduler(ScheduledExecutorService scheduler);
/**
* @return the object being built
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
index 7cedafb5c..7a8d1636e 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -5,8 +21,6 @@ import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -18,8 +32,6 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final Semaphore semaphore;
- private final Executor worker;
-
private final QueuePolicy<T, U> queuePolicy;
private final PushbackPolicy<T, U> pushbackPolicy;
@@ -34,15 +46,14 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final int parallelism;
BufferedPushStreamImpl(PushStreamProvider psp,
- ScheduledExecutorService scheduler, U eventQueue,
- int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy,
+ PushStreamExecutors executors, U eventQueue, int parallelism,
+ QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, worker, scheduler, connector);
+ super(psp, executors, connector);
this.eventQueue = eventQueue;
this.parallelism = parallelism;
this.semaphore = new Semaphore(parallelism);
- this.worker = worker;
this.queuePolicy = queuePolicy;
this.pushbackPolicy = pushbackPolicy;
}
@@ -74,7 +85,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
}
private void startWorker() {
- worker.execute(() -> {
+ executors.execute(() -> {
try {
PushEvent< ? extends T> event;
while ((event = eventQueue.poll()) != null) {
@@ -88,12 +99,12 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
close();
return;
} else if(backpressure > 0) {
- scheduler.schedule(this::startWorker, backpressure,
+ executors.schedule(this::startWorker, backpressure,
MILLISECONDS);
return;
}
}
-
+ // Only release this now the queue is empty
semaphore.release();
} catch (Exception e) {
close(PushEvent.error(e));
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
index 320adeebc..678751215 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -1,19 +1,32 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
implements PushStream<T> {
private final AbstractPushStreamImpl< ? > previous;
IntermediatePushStreamImpl(PushStreamProvider psp,
- Executor executor, ScheduledExecutorService scheduler,
+ PushStreamExecutors executors,
AbstractPushStreamImpl< ? > previous) {
- super(psp, executor, scheduler);
+ super(psp, executors);
this.previous = previous;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
index 968f668f6..57a3bb99a 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
index 1471a84f9..6d5953e34 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,7 +27,9 @@ import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
+import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
+import java.util.function.ToLongBiFunction;
import java.util.stream.Collector;
import org.osgi.annotation.versioning.ProviderType;
@@ -187,8 +189,8 @@ public interface PushStream<T> extends AutoCloseable {
* @param delay Nr of ms/thread that is queued back pressure
* @param e an executor to use for the background threads.
* @return Builder style (can be a new or the same object)
- * @throws IllegalArgumentException if the number of threads is < 1 or the
- * delay is < 0
+ * @throws IllegalArgumentException if the number of threads is &lt; 1 or
+ * the delay is &lt; 0
* @throws NullPointerException if the Executor is null
*/
PushStream<T> fork(int n, int delay, Executor e)
@@ -430,6 +432,33 @@ public interface PushStream<T> extends AutoCloseable {
BiFunction<Long,Collection<T>,R> f);
/**
+ * Changes the back-pressure propagated by this pipeline stage.
+ * <p>
+ * The supplied function receives the back pressure returned by the next
+ * pipeline stage and returns the back pressure that should be returned by
+ * this stage. This function will not be called if the previous pipeline
+ * stage returns negative back pressure.
+ *
+ * @param adjustment
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> adjustBackPressure(LongUnaryOperator adjustment);
+
+ /**
+ * Changes the back-pressure propagated by this pipeline stage.
+ * <p>
+ * The supplied function receives the data object passed to the next
+ * pipeline stage and the back pressure that was returned by that stage when
+ * accepting it. The function returns the back pressure that should be
+ * returned by this stage. This function will not be called if the previous
+ * pipeline stage returns negative back pressure.
+ *
+ * @param adjustment
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> adjustBackPressure(ToLongBiFunction<T,Long> adjustment);
+
+ /**
* Execute the action for each event received until the channel is closed.
* This is a terminating method, the returned promise is resolved when the
* channel closes.
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
index 506c8f2e7..d773deddf 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
@@ -1,7 +1,24 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import org.osgi.annotation.versioning.ProviderType;
@@ -20,7 +37,18 @@ public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? exten
/**
* Tells this {@link PushStreamBuilder} to create an unbuffered stream which
* delivers events directly to its consumer using the incoming delivery
- * thread.
+ * thread. Setting the {@link PushStreamBuilder} to be unbuffered means that
+ * any buffer, queue policy or push back policy will be ignored. Note that
+ * calling one of:
+ * <ul>
+ * <li>{@link #withBuffer(BlockingQueue)}</li>
+ * <li>{@link #withQueuePolicy(QueuePolicy)}</li>
+ * <li>{@link #withQueuePolicy(QueuePolicyOption)}</li>
+ * <li>{@link #withPushbackPolicy(PushbackPolicy)}</li>
+ * <li>{@link #withPushbackPolicy(PushbackPolicyOption, long)}</li>
+ * <li>{@link #withParallelism(int)}</li>
+ * </ul>
+ * after this method will reset this builder to require a buffer.
*
* @return the builder
*/
@@ -52,4 +80,7 @@ public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? exten
@Override
PushStreamBuilder<T,U> withExecutor(Executor executor);
+
+ @Override
+ PushStreamBuilder<T,U> withScheduler(ScheduledExecutorService scheduler);
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
index 0bf8d7127..75fbf07a5 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
@@ -1,22 +1,41 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends AbstractBufferBuilder<PushStream<T>,T,U>
implements PushStreamBuilder<T,U> {
- private final PushStreamProvider psp;
+ private final PushStreamProvider psp;
private final PushEventSource<T> eventSource;
private final Executor previousExecutor;
+ private final ScheduledExecutorService previousScheduler;
private boolean unbuffered;
PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
- PushEventSource<T> eventSource) {
+ ScheduledExecutorService defaultScheduler, PushEventSource<T> eventSource) {
this.psp = psp;
this.previousExecutor = defaultExecutor;
+ this.previousScheduler = defaultScheduler;
this.eventSource = eventSource;
this.worker = defaultExecutor;
}
@@ -66,11 +85,16 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
@Override
public PushStreamBuilder<T,U> withExecutor(Executor executor) {
- unbuffered = false;
return (PushStreamBuilder<T,U>) super.withExecutor(executor);
}
@Override
+ public PushStreamBuilder<T,U> withScheduler(
+ ScheduledExecutorService scheduler) {
+ return (PushStreamBuilder<T,U>) super.withScheduler(scheduler);
+ }
+
+ @Override
public PushStreamBuilder<T,U> unbuffered() {
unbuffered = true;
return this;
@@ -78,10 +102,16 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
@Override
public PushStream<T> build() {
+ Executor workerToUse = worker == null ? previousExecutor : worker;
+ ScheduledExecutorService timerToUse = timer == null ? previousScheduler
+ : timer;
+
if (unbuffered) {
- return psp.createUnbufferedStream(eventSource, previousExecutor);
+ return psp.createUnbufferedStream(eventSource, workerToUse,
+ timerToUse);
} else {
- return psp.createStream(eventSource, concurrency, worker, buffer,
+ return psp.createStream(eventSource, concurrency, workerToUse,
+ timerToUse, buffer,
bufferingPolicy, backPressure);
}
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
new file mode 100644
index 000000000..7c2509eef
--- /dev/null
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.osgi.util.promise.PromiseExecutors;
+
+class PushStreamExecutors extends PromiseExecutors {
+ PushStreamExecutors(Executor executor, ScheduledExecutorService scheduler) {
+ super(requireNonNull(executor), requireNonNull(scheduler));
+ }
+
+ void execute(Runnable operation) {
+ executor().execute(operation);
+ }
+
+ ScheduledFuture< ? > schedule(Runnable operation, long delay,
+ TimeUnit unit) {
+ return scheduledExecutor().schedule(operation, delay, unit);
+ }
+
+ @Override
+ protected Executor executor() {
+ return super.executor();
+ }
+
+ @Override
+ protected ScheduledExecutorService scheduledExecutor() {
+ return super.scheduledExecutor();
+ }
+}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
index f63c66210..c7d861bf3 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,11 +25,13 @@ import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
@@ -44,7 +46,7 @@ public final class PushStreamProvider {
private int schedulerReferences;
- private ScheduledExecutorService scheduler;
+ private ScheduledExecutorService sharedScheduler;
private ScheduledExecutorService acquireScheduler() {
try {
@@ -53,9 +55,10 @@ public final class PushStreamProvider {
schedulerReferences += 1;
if (schedulerReferences == 1) {
- scheduler = Executors.newSingleThreadScheduledExecutor();
+ sharedScheduler = Executors
+ .newSingleThreadScheduledExecutor();
}
- return scheduler;
+ return sharedScheduler;
} finally {
lock.unlock();
}
@@ -72,8 +75,8 @@ public final class PushStreamProvider {
schedulerReferences -= 1;
if (schedulerReferences == 0) {
- scheduler.shutdown();
- scheduler = null;
+ sharedScheduler.shutdown();
+ sharedScheduler = null;
}
} finally {
lock.unlock();
@@ -112,7 +115,8 @@ public final class PushStreamProvider {
* @return A {@link PushStream} with a default initial buffer
*/
public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
- return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32),
+ return createStream(eventSource, 1, null, null,
+ new ArrayBlockingQueue<>(32),
FAIL.getPolicy(), LINEAR.getPolicy(1000));
}
@@ -130,7 +134,7 @@ public final class PushStreamProvider {
*/
public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream(
PushEventSource<T> eventSource) {
- return new PushStreamBuilderImpl<T,U>(this, null, eventSource);
+ return new PushStreamBuilderImpl<T,U>(this, null, null, eventSource);
}
@SuppressWarnings({
@@ -138,7 +142,8 @@ public final class PushStreamProvider {
})
<T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream(
PushEventSource<T> eventSource, int parallelism, Executor executor,
- U queue, QueuePolicy<T,U> queuePolicy,
+ ScheduledExecutorService scheduler, U queue,
+ QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy) {
if (eventSource == null) {
@@ -154,15 +159,25 @@ public final class PushStreamProvider {
}
boolean closeExecutorOnClose;
- Executor toUse;
+ Executor workerToUse;
if (executor == null) {
- toUse = Executors.newFixedThreadPool(parallelism);
+ workerToUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
- toUse = executor;
+ workerToUse = executor;
closeExecutorOnClose = false;
}
+ boolean releaseSchedulerOnClose;
+ ScheduledExecutorService timerToUse;
+ if (scheduler == null) {
+ timerToUse = acquireScheduler();
+ releaseSchedulerOnClose = true;
+ } else {
+ timerToUse = scheduler;
+ releaseSchedulerOnClose = false;
+ }
+
if (queue == null) {
queue = (U) new ArrayBlockingQueue(32);
}
@@ -175,9 +190,9 @@ public final class PushStreamProvider {
pushbackPolicy = LINEAR.getPolicy(1000);
}
- @SuppressWarnings("resource")
PushStream<T> stream = new BufferedPushStreamImpl<>(this,
- acquireScheduler(), queue, parallelism, toUse, queuePolicy,
+ new PushStreamExecutors(workerToUse, timerToUse), queue,
+ parallelism, queuePolicy,
pushbackPolicy, aec -> {
try {
return eventSource.open(aec);
@@ -187,31 +202,51 @@ public final class PushStreamProvider {
}
});
- stream = stream.onClose(() -> {
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- }).map(x -> x);
+ return cleanupThreads(closeExecutorOnClose, workerToUse,
+ releaseSchedulerOnClose, stream);
+ }
+
+ private <T> PushStream<T> cleanupThreads(boolean closeExecutorOnClose,
+ Executor workerToUse, boolean releaseSchedulerOnClose,
+ PushStream<T> stream) {
+ if (closeExecutorOnClose || releaseSchedulerOnClose) {
+ stream = stream.onClose(() -> {
+ if (closeExecutorOnClose) {
+ ((ExecutorService) workerToUse).shutdown();
+ }
+ if (releaseSchedulerOnClose) {
+ releaseScheduler();
+ }
+ }).map(x -> x);
+ }
return stream;
}
<T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
- Executor executor) {
+ Executor executor, ScheduledExecutorService scheduler) {
boolean closeExecutorOnClose;
- Executor toUse;
+ Executor workerToUse;
if (executor == null) {
- toUse = Executors.newFixedThreadPool(2);
+ workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
- toUse = executor;
+ workerToUse = executor;
closeExecutorOnClose = false;
}
- @SuppressWarnings("resource")
- PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse,
- acquireScheduler(), aec -> {
+ boolean releaseSchedulerOnClose;
+ ScheduledExecutorService timerToUse;
+ if (scheduler == null) {
+ timerToUse = acquireScheduler();
+ releaseSchedulerOnClose = true;
+ } else {
+ timerToUse = scheduler;
+ releaseSchedulerOnClose = false;
+ }
+ PushStream<T> stream = new UnbufferedPushStreamImpl<>(this,
+ new PushStreamExecutors(workerToUse, timerToUse),
+ aec -> {
try {
return eventSource.open(aec);
} catch (Exception e) {
@@ -220,14 +255,8 @@ public final class PushStreamProvider {
}
});
- stream = stream.onClose(() -> {
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- }).map(x -> x);
-
- return stream;
+ return cleanupThreads(closeExecutorOnClose, workerToUse,
+ releaseSchedulerOnClose, stream);
}
/**
@@ -256,9 +285,17 @@ public final class PushStreamProvider {
* call to {@link PushEventSource#open(PushEventConsumer)} will begin event
* processing.
*
+ * <p>
* The {@link PushEventSource} will remain active until the backing stream
* is closed, and permits multiple consumers to
- * {@link PushEventSource#open(PushEventConsumer)} it.
+ * {@link PushEventSource#open(PushEventConsumer)} it. Note that this means
+ * the caller of this method is responsible for closing the supplied
+ * stream if it is not finite in length.
+ *
+ * <p>Late joining
+ * consumers will not receive historical events, but will immediately
+ * receive the terminal event which closed the stream if the stream is
+ * already closed.
*
* @param stream
*
@@ -266,26 +303,174 @@ public final class PushStreamProvider {
*/
public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
PushStream<T> stream) {
- return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
+ BufferBuilder<PushStream<T>,T,U> builder = stream.buildBuffer();
+
+ return new BufferBuilder<PushEventSource<T>,T,U>() {
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withBuffer(U queue) {
+ builder.withBuffer(queue);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withQueuePolicy(
+ QueuePolicy<T,U> queuePolicy) {
+ builder.withQueuePolicy(queuePolicy);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withQueuePolicy(
+ QueuePolicyOption queuePolicyOption) {
+ builder.withQueuePolicy(queuePolicyOption);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withPushbackPolicy(
+ PushbackPolicy<T,U> pushbackPolicy) {
+ builder.withPushbackPolicy(pushbackPolicy);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withPushbackPolicy(
+ PushbackPolicyOption pushbackPolicyOption, long time) {
+ builder.withPushbackPolicy(pushbackPolicyOption, time);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withParallelism(
+ int parallelism) {
+ builder.withParallelism(parallelism);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withExecutor(
+ Executor executor) {
+ builder.withExecutor(executor);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<PushEventSource<T>,T,U> withScheduler(
+ ScheduledExecutorService scheduler) {
+ builder.withScheduler(scheduler);
+ return this;
+ }
+
@Override
public PushEventSource<T> build() {
- SimplePushEventSource<T> spes = createSimplePushEventSource(
- concurrency, worker, buffer, bufferingPolicy, () -> {
+
+ AtomicBoolean connect = new AtomicBoolean();
+ AtomicReference<PushEvent<T>> terminalEvent = new AtomicReference<>();
+
+ CopyOnWriteArrayList<PushEventConsumer< ? super T>> consumers = new CopyOnWriteArrayList<>();
+
+ return consumer -> {
+
+ consumers.add(consumer);
+
+ PushEvent<T> terminal = terminalEvent.get();
+ if (terminal != null) {
+ if (consumers.remove(consumer)) {
+ // The stream is already done and we missed it
+ consumer.accept(terminal);
+ }
+ return () -> {
+ //Nothing to do, we have already sent the terminal event
+ };
+ }
+
+ if(!connect.getAndSet(true)) {
+ // connect
+ builder.build()
+ .forEachEvent(new MultiplexingConsumer<T>(
+ terminalEvent, consumers));
+ }
+
+ return () -> {
+ if (consumers.remove(consumer)) {
try {
- stream.close();
- } catch (Exception e) {
+ consumer.accept(PushEvent.close());
+ } catch (Exception ex) {
// TODO Auto-generated catch block
- e.printStackTrace();
+ ex.printStackTrace();
}
- });
- spes.connectPromise()
- .then(p -> stream.forEach(t -> spes.publish(t))
- .onResolve(() -> spes.close()));
- return spes;
+ }
+ };
+ };
}
};
}
+ private static class MultiplexingConsumer<T> implements PushEventConsumer<T> {
+
+ private final AtomicReference<PushEvent<T>> terminalEventStore;
+
+ private final CopyOnWriteArrayList<PushEventConsumer<? super T>> consumers;
+
+ public MultiplexingConsumer(
+ AtomicReference<PushEvent<T>> terminalEventStore,
+ CopyOnWriteArrayList<PushEventConsumer< ? super T>> consumers) {
+ super();
+ this.terminalEventStore = terminalEventStore;
+ this.consumers = consumers;
+ }
+
+ @Override
+ public long accept(PushEvent< ? extends T> event) throws Exception {
+ boolean isTerminal = event.isTerminal();
+ if(isTerminal) {
+ if(!terminalEventStore.compareAndSet(null, event.nodata())) {
+ // We got a duplicate terminal, silently ignore it
+ return -1;
+ }
+ for (PushEventConsumer< ? super T> pushEventConsumer : consumers) {
+ if(consumers.remove(pushEventConsumer)) {
+ try {
+ pushEventConsumer.accept(event);
+ } catch (Exception ex) {
+ // TODO Auto-generated catch block
+ ex.printStackTrace();
+ }
+ }
+ }
+ return -1;
+ } else {
+ long maxBP = 0;
+ for (PushEventConsumer< ? super T> pushEventConsumer : consumers) {
+ try {
+ long tmpBP = pushEventConsumer.accept(event);
+
+ if(tmpBP < 0 && consumers.remove(pushEventConsumer)) {
+ try {
+ pushEventConsumer.accept(PushEvent.close());
+ } catch (Exception ex) {
+ // TODO Auto-generated catch block
+ ex.printStackTrace();
+ }
+ } else if (tmpBP > maxBP) {
+ maxBP = tmpBP;
+ }
+ } catch (Exception ex) {
+ if(consumers.remove(pushEventConsumer)) {
+ try {
+ pushEventConsumer.accept(PushEvent.error(ex));
+ } catch (Exception ex2) {
+ // TODO Auto-generated catch block
+ ex2.printStackTrace();
+ }
+ }
+ }
+ }
+ return maxBP;
+ }
+ }
+ }
/**
* Create a {@link SimplePushEventSource} with the supplied type and default
@@ -345,7 +530,7 @@ public final class PushStreamProvider {
boolean closeExecutorOnClose;
Executor toUse;
if (executor == null) {
- toUse = Executors.newFixedThreadPool(2);
+ toUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
toUse = executor;
@@ -361,7 +546,8 @@ public final class PushStreamProvider {
}
SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
- toUse, acquireScheduler(), queuePolicy, queue, parallelism,
+ new PushStreamExecutors(toUse, acquireScheduler()), queuePolicy,
+ queue, parallelism,
() -> {
try {
onClose.run();
@@ -437,7 +623,8 @@ public final class PushStreamProvider {
public PushEventConsumer<T> build() {
PushEventPipe<T> pipe = new PushEventPipe<>();
- createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
+ createStream(pipe, concurrency, worker, timer, buffer,
+ bufferingPolicy, backPressure)
.forEachEvent(delegate);
return pipe;
@@ -501,7 +688,7 @@ public final class PushStreamProvider {
return () -> closed.set(true);
};
- return this.<T> createUnbufferedStream(pes, null);
+ return this.<T> createUnbufferedStream(pes, null, null);
}
/**
@@ -511,24 +698,36 @@ public final class PushStreamProvider {
*
* @param executor The worker to use to push items from the Stream into the
* PushStream
+ * @param scheduler The scheduler to use to trigger timed events in the
+ * PushStream
* @param items The items to push into the PushStream
* @return A PushStream containing the items from the Java Stream
*/
- public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) {
+ public <T> PushStream<T> streamOf(Executor executor,
+ ScheduledExecutorService scheduler, Stream<T> items) {
boolean closeExecutorOnClose;
- Executor toUse;
+ Executor workerToUse;
if (executor == null) {
- toUse = Executors.newFixedThreadPool(2);
+ workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
- toUse = executor;
+ workerToUse = executor;
closeExecutorOnClose = false;
}
- @SuppressWarnings("resource")
+ boolean releaseSchedulerOnClose;
+ ScheduledExecutorService timerToUse;
+ if (scheduler == null) {
+ timerToUse = acquireScheduler();
+ releaseSchedulerOnClose = true;
+ } else {
+ timerToUse = scheduler;
+ releaseSchedulerOnClose = false;
+ }
+
PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
- this, toUse, acquireScheduler(), aec -> {
+ this, new PushStreamExecutors(workerToUse, timerToUse), aec -> {
return () -> { /* No action to take */ };
}) {
@@ -537,7 +736,7 @@ public final class PushStreamProvider {
if (super.begin()) {
Iterator<T> it = items.iterator();
- toUse.execute(() -> pushData(it));
+ executors.execute(() -> pushData(it));
return true;
}
@@ -554,8 +753,9 @@ public final class PushStreamProvider {
close();
return;
} else {
- scheduler.schedule(
- () -> toUse.execute(() -> pushData(it)),
+ executors.schedule(
+ () -> executors
+ .execute(() -> pushData(it)),
returnValue, MILLISECONDS);
return;
}
@@ -568,13 +768,7 @@ public final class PushStreamProvider {
}
};
- stream = stream.onClose(() -> {
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- }).map(x -> x);
-
- return stream;
+ return cleanupThreads(closeExecutorOnClose, workerToUse,
+ releaseSchedulerOnClose, stream);
}
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
index 747b4530d..f47bcb1e7 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
@@ -34,7 +34,8 @@ public interface SimplePushEventSource<T>
* any more events published by it. Calling this method sends a close event
* to all connected consumers. After calling this method any
* {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)}
- * this source will immediately receive a close event.
+ * this source will immediately receive a close event, and will not see any
+ * remaining buffered events.
*/
@Override
void close();
@@ -53,7 +54,11 @@ public interface SimplePushEventSource<T>
/**
* Close this source for now, but potentially reopen it later. Calling this
- * method asynchronously sends a close event to all connected consumers.
+ * method asynchronously sends a close event to all connected consumers and
+ * then disconnects them. Any events previously queued by the
+ * {@link #publish(Object)} method will be delivered before this close
+ * event.
+ * <p>
* After calling this method any {@link PushEventConsumer} that wishes may
* {@link #open(PushEventConsumer)} this source, and will receive subsequent
* events.
@@ -62,7 +67,11 @@ public interface SimplePushEventSource<T>
/**
* Close this source for now, but potentially reopen it later. Calling this
- * method asynchronously sends an error event to all connected consumers.
+ * method asynchronously sends an error event to all connected consumers and
+ * then disconnects them. Any events previously queued by the
+ * {@link #publish(Object)} method will be delivered before this error
+ * event.
+ * <p>
* After calling this method any {@link PushEventConsumer} that wishes may
* {@link #open(PushEventConsumer)} this source, and will receive subsequent
* events.
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
index e31c9bf59..094a580ab 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import static java.util.Collections.emptyList;
@@ -7,9 +23,7 @@ import static java.util.stream.Collectors.toList;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import org.osgi.util.promise.Deferred;
@@ -18,12 +32,11 @@ import org.osgi.util.promise.Promises;
class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
implements SimplePushEventSource<T> {
-
+
private final Object lock = new Object();
- private final Executor worker;
-
- private final ScheduledExecutorService scheduler;
+ private final PushStreamExecutors executors;
+ private final PushStreamExecutors sameThread;
private final QueuePolicy<T,U> queuePolicy;
@@ -44,11 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private boolean waitForFinishes;
- public SimplePushEventSourceImpl(Executor worker,
- ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy,
+ public SimplePushEventSourceImpl(PushStreamExecutors executors,
+ QueuePolicy<T,U> queuePolicy,
U queue, int parallelism, Runnable onClose) {
- this.worker = worker;
- this.scheduler = scheduler;
+ this.executors = executors;
+ this.sameThread = new PushStreamExecutors(
+ PushStreamExecutors.inlineExecutor(),
+ executors.scheduledExecutor());
this.queuePolicy = queuePolicy;
this.queue = queue;
this.parallelism = parallelism;
@@ -96,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
try {
- worker.execute(() -> safePush(pec, event));
+ executors.execute(() -> safePush(pec, event));
} catch (RejectedExecutionException ree) {
// TODO log?
if (!event.isTerminal()) {
@@ -107,21 +122,22 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
}
}
- @SuppressWarnings("boxing")
private Promise<Long> doSendWithBackPressure(
PushEventConsumer< ? super T> pec, PushEvent<T> event) {
- Deferred<Long> d = new Deferred<>();
+ Deferred<Long> d = sameThread.deferred();
try {
- worker.execute(
- () -> d.resolve(System.nanoTime() + safePush(pec, event)));
+ executors.execute(
+ () -> d.resolve(Long.valueOf(
+ System.nanoTime() + safePush(pec, event))));
} catch (RejectedExecutionException ree) {
// TODO log?
+
if (!event.isTerminal()) {
close(PushEvent.error(ree));
- return Promises.resolved(System.nanoTime());
+ d.resolve(Long.valueOf(System.nanoTime()));
} else {
- return Promises
- .resolved(System.nanoTime() + safePush(pec, event));
+ d.resolve(
+ Long.valueOf(System.nanoTime() + safePush(pec, event)));
}
}
return d.getPromise();
@@ -135,7 +151,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
closeConsumer(pec, PushEvent.close());
return -1;
}
- return backpressure;
+ return event.isTerminal() ? -1 : backpressure;
} catch (Exception e) {
// TODO log?
if (!event.isTerminal()) {
@@ -221,13 +237,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
"unchecked", "boxing"
})
private void startWorker() {
- worker.execute(() -> {
+ executors.execute(() -> {
try {
for(;;) {
PushEvent<T> event;
List<PushEventConsumer< ? super T>> toCall;
- boolean resetWait = false;
+ boolean resetWait;
synchronized (lock) {
if(waitForFinishes) {
semaphore.release();
@@ -244,6 +260,11 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
break;
}
+ if (connected.isEmpty()) {
+ queue.clear();
+ break;
+ }
+
toCall = new ArrayList<>(connected);
if (event.isTerminal()) {
waitForFinishes = true;
@@ -252,40 +273,39 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
while (!semaphore.tryAcquire(parallelism - 1)) {
lock.wait();
}
- }
- }
-
- List<Promise<Long>> calls = toCall.stream().map(pec -> {
- if (semaphore.tryAcquire()) {
- try {
- return doSendWithBackPressure(pec, event);
- } finally {
- semaphore.release();
- }
} else {
- return Promises.resolved(
- System.nanoTime() + safePush(pec, event));
+ resetWait = false;
}
- }).collect(toList());
-
- long toWait = Promises.all(calls)
- .map(l -> l.stream()
- .max(Long::compareTo)
- .orElseGet(() -> System.nanoTime()))
- .getValue() - System.nanoTime();
+ }
+ Promise<Long> backPressure = deliver(toCall, event);
- if (toWait > 0) {
- scheduler.schedule(this::startWorker, toWait,
- NANOSECONDS);
- return;
- }
+ if (backPressure.isDone()) {
+ handleReset(resetWait);
- if (resetWait == true) {
- synchronized (lock) {
- waitForFinishes = false;
- lock.notifyAll();
+ long toWait = backPressure.getValue()
+ - System.nanoTime();
+
+ if (toWait > 0) {
+ executors.schedule(this::startWorker, toWait,
+ NANOSECONDS);
+ return;
}
+ } else {
+ backPressure.then(p -> {
+ handleReset(resetWait);
+ long toWait = p.getValue() - System.nanoTime();
+
+ if (toWait > 0) {
+ executors.schedule(this::startWorker, toWait,
+ NANOSECONDS);
+ } else {
+ startWorker();
+ }
+ return p;
+ }, p -> close(
+ PushEvent.error((Exception) p.getFailure())));
+ return;
}
}
@@ -304,6 +324,41 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
}
+ private void handleReset(boolean resetWait) {
+ if (resetWait == true) {
+ synchronized (lock) {
+ waitForFinishes = false;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ private Promise<Long> deliver(List<PushEventConsumer< ? super T>> toCall,
+ PushEvent<T> event) {
+ if (toCall.size() == 1) {
+ return doCall(event, toCall.get(0));
+ } else {
+ List<Promise<Long>> calls = toCall.stream().map(pec -> {
+ if (semaphore.tryAcquire()) {
+ return doSendWithBackPressure(pec, event)
+ .onResolve(() -> semaphore.release());
+ } else {
+ return doCall(event, pec);
+ }
+ }).collect(toList());
+ return Promises
+ .all(sameThread.deferred(), calls)
+ .map(l -> l.stream().max(Long::compareTo).orElseGet(
+ () -> Long.valueOf(System.nanoTime())));
+ }
+ }
+
+ private Promise<Long> doCall(PushEvent<T> event,
+ PushEventConsumer< ? super T> pec) {
+ return sameThread.resolved(
+ Long.valueOf(System.nanoTime() + safePush(pec, event)));
+ }
+
@Override
public boolean isConnected() {
synchronized (lock) {
@@ -320,17 +375,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
if (connected.isEmpty()) {
if (connectPromise == null) {
- connectPromise = new Deferred<>();
+ connectPromise = executors.deferred();
}
return connectPromise.getPromise();
} else {
- return Promises.resolved(null);
+ return executors.resolved(null);
}
}
}
private Promise<Void> closedConnectPromise() {
- return Promises.failed(new IllegalStateException(
+ return executors.failed(new IllegalStateException(
"This SimplePushEventSource is closed"));
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
index d6116e34e..53400c4c2 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -1,11 +1,25 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import static java.util.Optional.ofNullable;
import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -17,9 +31,9 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T
protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>();
UnbufferedPushStreamImpl(PushStreamProvider psp,
- Executor executor, ScheduledExecutorService scheduler,
+ PushStreamExecutors executors,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executor, scheduler);
+ super(psp, executors);
this.connector = connector;
}

Back to the top