diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java')
-rw-r--r-- | bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java index 7a8d1636e..dfb9eeeaa 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import org.osgi.util.promise.PromiseFactory; + class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> { @@ -46,11 +48,11 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> private final int parallelism; BufferedPushStreamImpl(PushStreamProvider psp, - PushStreamExecutors executors, U eventQueue, int parallelism, + PromiseFactory promiseFactory, U eventQueue, int parallelism, QueuePolicy<T,U> queuePolicy, PushbackPolicy<T,U> pushbackPolicy, Function<PushEventConsumer<T>,AutoCloseable> connector) { - super(psp, executors, connector); + super(psp, promiseFactory, connector); this.eventQueue = eventQueue; this.parallelism = parallelism; this.semaphore = new Semaphore(parallelism); @@ -85,7 +87,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> } private void startWorker() { - executors.execute(() -> { + promiseFactory.executor().execute(() -> { try { PushEvent< ? extends T> event; while ((event = eventQueue.poll()) != null) { @@ -99,7 +101,8 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> close(); return; } else if(backpressure > 0) { - executors.schedule(this::startWorker, backpressure, + promiseFactory.scheduledExecutor().schedule( + this::startWorker, backpressure, MILLISECONDS); return; } |