diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java')
-rw-r--r-- | bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java | 40 |
1 files changed, 35 insertions, 5 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java index 0bf8d7127..75fbf07a5 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java @@ -1,22 +1,41 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> extends AbstractBufferBuilder<PushStream<T>,T,U> implements PushStreamBuilder<T,U> { - private final PushStreamProvider psp; + private final PushStreamProvider psp; private final PushEventSource<T> eventSource; private final Executor previousExecutor; + private final ScheduledExecutorService previousScheduler; private boolean unbuffered; PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor, - PushEventSource<T> eventSource) { + ScheduledExecutorService defaultScheduler, PushEventSource<T> eventSource) { this.psp = psp; this.previousExecutor = defaultExecutor; + this.previousScheduler = defaultScheduler; this.eventSource = eventSource; this.worker = defaultExecutor; } @@ -66,11 +85,16 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> @Override public PushStreamBuilder<T,U> withExecutor(Executor executor) { - unbuffered = false; return (PushStreamBuilder<T,U>) super.withExecutor(executor); } @Override + public PushStreamBuilder<T,U> withScheduler( + ScheduledExecutorService scheduler) { + return (PushStreamBuilder<T,U>) super.withScheduler(scheduler); + } + + @Override public PushStreamBuilder<T,U> unbuffered() { unbuffered = true; return this; @@ -78,10 +102,16 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> @Override public PushStream<T> build() { + Executor workerToUse = worker == null ? previousExecutor : worker; + ScheduledExecutorService timerToUse = timer == null ? previousScheduler + : timer; + if (unbuffered) { - return psp.createUnbufferedStream(eventSource, previousExecutor); + return psp.createUnbufferedStream(eventSource, workerToUse, + timerToUse); } else { - return psp.createStream(eventSource, concurrency, worker, buffer, + return psp.createStream(eventSource, concurrency, workerToUse, + timerToUse, buffer, bufferingPolicy, backPressure); } } |