Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java33
1 files changed, 22 insertions, 11 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
index 7cedafb5c..7a8d1636e 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.osgi.util.pushstream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -5,8 +21,6 @@ import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -18,8 +32,6 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final Semaphore semaphore;
- private final Executor worker;
-
private final QueuePolicy<T, U> queuePolicy;
private final PushbackPolicy<T, U> pushbackPolicy;
@@ -34,15 +46,14 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final int parallelism;
BufferedPushStreamImpl(PushStreamProvider psp,
- ScheduledExecutorService scheduler, U eventQueue,
- int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy,
+ PushStreamExecutors executors, U eventQueue, int parallelism,
+ QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, worker, scheduler, connector);
+ super(psp, executors, connector);
this.eventQueue = eventQueue;
this.parallelism = parallelism;
this.semaphore = new Semaphore(parallelism);
- this.worker = worker;
this.queuePolicy = queuePolicy;
this.pushbackPolicy = pushbackPolicy;
}
@@ -74,7 +85,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
}
private void startWorker() {
- worker.execute(() -> {
+ executors.execute(() -> {
try {
PushEvent< ? extends T> event;
while ((event = eventQueue.poll()) != null) {
@@ -88,12 +99,12 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
close();
return;
} else if(backpressure > 0) {
- scheduler.schedule(this::startWorker, backpressure,
+ executors.schedule(this::startWorker, backpressure,
MILLISECONDS);
return;
}
}
-
+ // Only release this now the queue is empty
semaphore.release();
} catch (Exception e) {
close(PushEvent.error(e));

Back to the top