Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java11
1 files changed, 7 insertions, 4 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
index 7a8d1636e..dfb9eeeaa 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import org.osgi.util.promise.PromiseFactory;
+
class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
@@ -46,11 +48,11 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final int parallelism;
BufferedPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors, U eventQueue, int parallelism,
+ PromiseFactory promiseFactory, U eventQueue, int parallelism,
QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executors, connector);
+ super(psp, promiseFactory, connector);
this.eventQueue = eventQueue;
this.parallelism = parallelism;
this.semaphore = new Semaphore(parallelism);
@@ -85,7 +87,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
}
private void startWorker() {
- executors.execute(() -> {
+ promiseFactory.executor().execute(() -> {
try {
PushEvent< ? extends T> event;
while ((event = eventQueue.poll()) != null) {
@@ -99,7 +101,8 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
close();
return;
} else if(backpressure > 0) {
- executors.schedule(this::startWorker, backpressure,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, backpressure,
MILLISECONDS);
return;
}

Back to the top