Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java88
1 files changed, 88 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
new file mode 100644
index 000000000..5ec7cb336
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
@@ -0,0 +1,88 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ extends AbstractBufferBuilder<PushStream<T>,T,U>
+ implements PushStreamBuilder<T,U> {
+
+ private final PushStreamProvider psp;
+ private final PushEventSource<T> eventSource;
+ private final Executor previousExecutor;
+
+ private boolean unbuffered;
+
+ PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
+ PushEventSource<T> eventSource) {
+ this.psp = psp;
+ this.previousExecutor = defaultExecutor;
+ this.eventSource = eventSource;
+ this.worker = defaultExecutor;
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withBuffer(U queue) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withBuffer(queue);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withQueuePolicy(
+ QueuePolicy<T,U> queuePolicy) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withQueuePolicy(
+ QueuePolicyOption queuePolicyOption) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withQueuePolicy(
+ queuePolicyOption);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withPushbackPolicy(
+ PushbackPolicy<T,U> pushbackPolicy) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+ pushbackPolicy);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withPushbackPolicy(
+ PushbackPolicyOption pushbackPolicyOption, long time) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+ pushbackPolicyOption, time);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withParallelism(int parallelism) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withParallelism(parallelism);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withExecutor(Executor executor) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withExecutor(executor);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> unbuffered() {
+ unbuffered = true;
+ return this;
+ }
+
+ @Override
+ public PushStream<T> create() {
+ if (unbuffered) {
+ return psp.createUnbufferedStream(eventSource, previousExecutor);
+ } else {
+ return psp.createStream(eventSource, concurrency, worker, buffer,
+ bufferingPolicy, backPressure);
+ }
+ }
+}

Back to the top