diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java')
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java new file mode 100644 index 000000000..5ec7cb336 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java @@ -0,0 +1,88 @@ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends AbstractBufferBuilder<PushStream<T>,T,U> + implements PushStreamBuilder<T,U> { + + private final PushStreamProvider psp; + private final PushEventSource<T> eventSource; + private final Executor previousExecutor; + + private boolean unbuffered; + + PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor, + PushEventSource<T> eventSource) { + this.psp = psp; + this.previousExecutor = defaultExecutor; + this.eventSource = eventSource; + this.worker = defaultExecutor; + } + + @Override + public PushStreamBuilder<T,U> withBuffer(U queue) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withBuffer(queue); + } + + @Override + public PushStreamBuilder<T,U> withQueuePolicy( + QueuePolicy<T,U> queuePolicy) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy); + } + + @Override + public PushStreamBuilder<T,U> withQueuePolicy( + QueuePolicyOption queuePolicyOption) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withQueuePolicy( + queuePolicyOption); + } + + @Override + public PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withPushbackPolicy( + pushbackPolicy); + } + + @Override + public PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withPushbackPolicy( + pushbackPolicyOption, time); + } + + @Override + public PushStreamBuilder<T,U> withParallelism(int parallelism) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withParallelism(parallelism); + } + + @Override + public PushStreamBuilder<T,U> withExecutor(Executor executor) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withExecutor(executor); + } + + @Override + public PushStreamBuilder<T,U> unbuffered() { + unbuffered = true; + return this; + } + + @Override + public PushStream<T> create() { + if (unbuffered) { + return psp.createUnbufferedStream(eventSource, previousExecutor); + } else { + return psp.createStream(eventSource, concurrency, worker, buffer, + bufferingPolicy, backPressure); + } + } +} |