diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java')
-rw-r--r-- | bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java index 7cedafb5c..7a8d1636e 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -5,8 +21,6 @@ import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED; import static org.osgi.util.pushstream.PushEventConsumer.ABORT; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -18,8 +32,6 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> private final Semaphore semaphore; - private final Executor worker; - private final QueuePolicy<T, U> queuePolicy; private final PushbackPolicy<T, U> pushbackPolicy; @@ -34,15 +46,14 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> private final int parallelism; BufferedPushStreamImpl(PushStreamProvider psp, - ScheduledExecutorService scheduler, U eventQueue, - int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy, + PushStreamExecutors executors, U eventQueue, int parallelism, + QueuePolicy<T,U> queuePolicy, PushbackPolicy<T,U> pushbackPolicy, Function<PushEventConsumer<T>,AutoCloseable> connector) { - super(psp, worker, scheduler, connector); + super(psp, executors, connector); this.eventQueue = eventQueue; this.parallelism = parallelism; this.semaphore = new Semaphore(parallelism); - this.worker = worker; this.queuePolicy = queuePolicy; this.pushbackPolicy = pushbackPolicy; } @@ -74,7 +85,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> } private void startWorker() { - worker.execute(() -> { + executors.execute(() -> { try { PushEvent< ? extends T> event; while ((event = eventQueue.poll()) != null) { @@ -88,12 +99,12 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>> close(); return; } else if(backpressure > 0) { - scheduler.schedule(this::startWorker, backpressure, + executors.schedule(this::startWorker, backpressure, MILLISECONDS); return; } } - + // Only release this now the queue is empty semaphore.release(); } catch (Exception e) { close(PushEvent.error(e)); |