diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java')
-rw-r--r-- | bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java | 581 |
1 files changed, 0 insertions, 581 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java deleted file mode 100644 index be87c6bce..000000000 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java +++ /dev/null @@ -1,581 +0,0 @@ -/* - * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.osgi.util.pushstream; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED; -import static org.osgi.util.pushstream.PushEvent.*; -import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR; -import static org.osgi.util.pushstream.QueuePolicyOption.FAIL; - -import java.util.Iterator; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; -import java.util.stream.Stream; - -/** - * A factory for {@link PushStream} instances, and utility methods for handling - * {@link PushEventSource}s and {@link PushEventConsumer}s - */ -public final class PushStreamProvider { - - private final Lock lock = new ReentrantLock(true); - - private int schedulerReferences; - - private ScheduledExecutorService scheduler; - - private ScheduledExecutorService acquireScheduler() { - try { - lock.lockInterruptibly(); - try { - schedulerReferences += 1; - - if (schedulerReferences == 1) { - scheduler = Executors.newSingleThreadScheduledExecutor(); - } - return scheduler; - } finally { - lock.unlock(); - } - } catch (InterruptedException e) { - throw new IllegalStateException("Unable to acquire the Scheduler", - e); - } - } - - private void releaseScheduler() { - try { - lock.lockInterruptibly(); - try { - schedulerReferences -= 1; - - if (schedulerReferences == 0) { - scheduler.shutdown(); - scheduler = null; - } - } finally { - lock.unlock(); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - /** - * Create a stream with the default configured buffer, executor size, queue, - * queue policy and pushback policy. This is equivalent to calling - * - * <code> - * buildStream(source).create(); - * </code> - * - * <p> - * This stream will be buffered from the event producer, and will honour - * back pressure even if the source does not. - * - * <p> - * Buffered streams are useful for "bursty" event sources which produce a - * number of events close together, then none for some time. These bursts - * can sometimes overwhelm downstream processors. Buffering will not, - * however, protect downstream components from a source which produces - * events faster (on average) than they can be consumed. - * - * <p> - * Event delivery will not begin until a terminal operation is reached on - * the chain of AsyncStreams. Once a terminal operation is reached the - * stream will be connected to the event source. - * - * @param eventSource - * @return A {@link PushStream} with a default initial buffer - */ - public <T> PushStream<T> createStream(PushEventSource<T> eventSource) { - return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32), - FAIL.getPolicy(), LINEAR.getPolicy(1000)); - } - - /** - * Builds a push stream with custom configuration. - * - * <p> - * - * The resulting {@link PushStream} may be buffered or unbuffered depending - * on how it is configured. - * - * @param eventSource The source of the events - * - * @return A {@link PushStreamBuilder} for the stream - */ - public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream( - PushEventSource<T> eventSource) { - return new PushStreamBuilderImpl<T,U>(this, null, eventSource); - } - - @SuppressWarnings({ - "rawtypes", "unchecked" - }) - <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream( - PushEventSource<T> eventSource, int parallelism, Executor executor, - U queue, QueuePolicy<T,U> queuePolicy, - PushbackPolicy<T,U> pushbackPolicy) { - - if (eventSource == null) { - throw new NullPointerException("There is no source of events"); - } - - if (parallelism < 0) { - throw new IllegalArgumentException( - "The supplied parallelism cannot be less than zero. It was " - + parallelism); - } else if (parallelism == 0) { - parallelism = 1; - } - - boolean closeExecutorOnClose; - Executor toUse; - if (executor == null) { - toUse = Executors.newFixedThreadPool(parallelism); - closeExecutorOnClose = true; - } else { - toUse = executor; - closeExecutorOnClose = false; - } - - if (queue == null) { - queue = (U) new ArrayBlockingQueue(32); - } - - if (queuePolicy == null) { - queuePolicy = FAIL.getPolicy(); - } - - if (pushbackPolicy == null) { - pushbackPolicy = LINEAR.getPolicy(1000); - } - - @SuppressWarnings("resource") - PushStream<T> stream = new BufferedPushStreamImpl<>(this, - acquireScheduler(), queue, parallelism, toUse, queuePolicy, - pushbackPolicy, aec -> { - try { - return eventSource.open(aec); - } catch (Exception e) { - throw new RuntimeException( - "Unable to connect to event source", e); - } - }); - - stream = stream.onClose(() -> { - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }).map(Function.identity()); - return stream; - } - - <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource, - Executor executor) { - - boolean closeExecutorOnClose; - Executor toUse; - if (executor == null) { - toUse = Executors.newFixedThreadPool(2); - closeExecutorOnClose = true; - } else { - toUse = executor; - closeExecutorOnClose = false; - } - - @SuppressWarnings("resource") - PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse, - acquireScheduler(), aec -> { - try { - return eventSource.open(aec); - } catch (Exception e) { - throw new RuntimeException( - "Unable to connect to event source", e); - } - }); - - stream = stream.onClose(() -> { - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }).map(Function.identity()); - - return stream; - } - - /** - * Convert an {@link PushStream} into an {@link PushEventSource}. The first - * call to {@link PushEventSource#open(PushEventConsumer)} will begin event - * processing. - * - * The {@link PushEventSource} will remain active until the backing stream - * is closed, and permits multiple consumers to - * {@link PushEventSource#open(PushEventConsumer)} it. - * - * This is equivalent to: <code> - * buildEventSourceFromStream(stream).create(); - * </code> - * - * @param stream - * @return a {@link PushEventSource} backed by the {@link PushStream} - */ - public <T> PushEventSource<T> createEventSourceFromStream( - PushStream<T> stream) { - return buildEventSourceFromStream(stream).create(); - } - - /** - * Convert an {@link PushStream} into an {@link PushEventSource}. The first - * call to {@link PushEventSource#open(PushEventConsumer)} will begin event - * processing. - * - * The {@link PushEventSource} will remain active until the backing stream - * is closed, and permits multiple consumers to - * {@link PushEventSource#open(PushEventConsumer)} it. - * - * @param stream - * - * @return a {@link PushEventSource} backed by the {@link PushStream} - */ - public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream( - PushStream<T> stream) { - return new AbstractBufferBuilder<PushEventSource<T>,T,U>() { - @Override - public PushEventSource<T> create() { - SimplePushEventSource<T> spes = createSimplePushEventSource( - concurrency, worker, buffer, bufferingPolicy, () -> { - try { - stream.close(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); - spes.connectPromise() - .then(p -> stream.forEach(t -> spes.publish(t)) - .onResolve(() -> spes.close())); - return spes; - } - }; - } - - - /** - * Create a {@link SimplePushEventSource} with the supplied type and default - * buffering behaviours. The SimplePushEventSource will respond to back - * pressure requests from the consumers connected to it. - * - * This is equivalent to: <code> - * buildSimpleEventSource(type).create(); - * </code> - * - * @param type - * @return a {@link SimplePushEventSource} - */ - public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) { - return createSimplePushEventSource(1, null, - new ArrayBlockingQueue<>(32), - FAIL.getPolicy(), () -> { /* Nothing else to do */ }); - } - - /** - * - * Build a {@link SimplePushEventSource} with the supplied type and custom - * buffering behaviours. The SimplePushEventSource will respond to back - * pressure requests from the consumers connected to it. - * - * @param type - * - * @return a {@link SimplePushEventSource} - */ - - public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource( - Class<T> type) { - return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() { - @Override - public SimplePushEventSource<T> create() { - return createSimplePushEventSource(concurrency, worker, buffer, - bufferingPolicy, () -> { /* Nothing else to do */ }); - } - }; - } - - @SuppressWarnings({ - "unchecked", "rawtypes" - }) - <T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource( - int parallelism, Executor executor, U queue, - QueuePolicy<T,U> queuePolicy, Runnable onClose) { - - if (parallelism < 0) { - throw new IllegalArgumentException( - "The supplied parallelism cannot be less than zero. It was " - + parallelism); - } else if (parallelism == 0) { - parallelism = 1; - } - - boolean closeExecutorOnClose; - Executor toUse; - if (executor == null) { - toUse = Executors.newFixedThreadPool(2); - closeExecutorOnClose = true; - } else { - toUse = executor; - closeExecutorOnClose = false; - } - - if (queue == null) { - queue = (U) new ArrayBlockingQueue(32); - } - - if (queuePolicy == null) { - queuePolicy = FAIL.getPolicy(); - } - - SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>( - toUse, acquireScheduler(), queuePolicy, queue, parallelism, - () -> { - try { - onClose.run(); - } catch (Exception e) { - // TODO log this? - } - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }); - return spes; - } - - /** - * Create a buffered {@link PushEventConsumer} with the default configured - * buffer, executor size, queue, queue policy and pushback policy. This is - * equivalent to calling - * - * <code> - * buildBufferedConsumer(delegate).create(); - * </code> - * - * <p> - * The returned consumer will be buffered from the event source, and will - * honour back pressure requests from its delegate even if the event source - * does not. - * - * <p> - * Buffered consumers are useful for "bursty" event sources which produce a - * number of events close together, then none for some time. These bursts - * can sometimes overwhelm the consumer. Buffering will not, however, - * protect downstream components from a source which produces events faster - * than they can be consumed. - * - * @param delegate - * @return a {@link PushEventConsumer} with a buffer directly before it - */ - public <T> PushEventConsumer<T> createBufferedConsumer( - PushEventConsumer<T> delegate) { - return buildBufferedConsumer(delegate).create(); - } - - /** - * Build a buffered {@link PushEventConsumer} with custom configuration. - * <p> - * The returned consumer will be buffered from the event source, and will - * honour back pressure requests from its delegate even if the event source - * does not. - * <p> - * Buffered consumers are useful for "bursty" event sources which produce a - * number of events close together, then none for some time. These bursts - * can sometimes overwhelm the consumer. Buffering will not, however, - * protect downstream components from a source which produces events faster - * than they can be consumed. - * <p> - * Buffers are also useful as "circuit breakers". If a - * {@link QueuePolicyOption#FAIL} is used then a full buffer will request - * that the stream close, preventing an event storm from reaching the - * client. - * <p> - * Note that this buffered consumer will close when it receives a terminal - * event, or if the delegate returns negative backpressure. No further - * events will be propagated after this time. - * - * @param delegate - * @return a {@link PushEventConsumer} with a buffer directly before it - */ - public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer( - PushEventConsumer<T> delegate) { - return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() { - @Override - public PushEventConsumer<T> create() { - PushEventPipe<T> pipe = new PushEventPipe<>(); - - createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure) - .forEachEvent(delegate); - - return pipe; - } - }; - } - - static final class PushEventPipe<T> - implements PushEventConsumer<T>, PushEventSource<T> { - - volatile PushEventConsumer< ? super T> delegate; - - @Override - public AutoCloseable open(PushEventConsumer< ? super T> pec) - throws Exception { - return () -> { /* Nothing else to do */ }; - } - - @Override - public long accept(PushEvent< ? extends T> event) throws Exception { - return delegate.accept(event); - } - - } - - /** - * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The - * data from the stream will be pushed into the PushStream synchronously as - * it is opened. This may make terminal operations blocking unless a buffer - * has been added to the {@link PushStream}. Care should be taken with - * infinite {@link Stream}s to avoid blocking indefinitely. - * - * @param items The items to push into the PushStream - * @return A PushStream containing the items from the Java Stream - */ - public <T> PushStream<T> streamOf(Stream<T> items) { - PushEventSource<T> pes = aec -> { - AtomicBoolean closed = new AtomicBoolean(false); - - items.mapToLong(i -> { - try { - long returnValue = closed.get() ? -1 : aec.accept(data(i)); - if (returnValue < 0) { - aec.accept(PushEvent.<T> close()); - } - return returnValue; - } catch (Exception e) { - try { - aec.accept(PushEvent.<T> error(e)); - } catch (Exception e2) {/* No further events needed */} - return -1; - } - }).filter(i -> i < 0).findFirst().orElseGet(() -> { - try { - return aec.accept(PushEvent.<T> close()); - } catch (Exception e) { - return -1; - } - }); - - return () -> closed.set(true); - }; - - return this.<T> createUnbufferedStream(pes, null); - } - - /** - * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The - * data from the stream will be pushed into the PushStream asynchronously - * using the supplied Executor. - * - * @param executor The worker to use to push items from the Stream into the - * PushStream - * @param items The items to push into the PushStream - * @return A PushStream containing the items from the Java Stream - */ - public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) { - - boolean closeExecutorOnClose; - Executor toUse; - if (executor == null) { - toUse = Executors.newFixedThreadPool(2); - closeExecutorOnClose = true; - } else { - toUse = executor; - closeExecutorOnClose = false; - } - - @SuppressWarnings("resource") - PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>( - this, toUse, acquireScheduler(), aec -> { - return () -> { /* No action to take */ }; - }) { - - @Override - protected boolean begin() { - if (super.begin()) { - Iterator<T> it = items.iterator(); - - toUse.execute(() -> pushData(it)); - - return true; - } - return false; - } - - private void pushData(Iterator<T> it) { - while (it.hasNext()) { - try { - long returnValue = closed.get() == CLOSED ? -1 - : handleEvent(data(it.next())); - if (returnValue != 0) { - if (returnValue < 0) { - close(); - return; - } else { - scheduler.schedule( - () -> toUse.execute(() -> pushData(it)), - returnValue, MILLISECONDS); - return; - } - } - } catch (Exception e) { - close(error(e)); - } - } - close(); - } - }; - - stream = stream.onClose(() -> { - if (closeExecutorOnClose) { - ((ExecutorService) toUse).shutdown(); - } - releaseScheduler(); - }).map(Function.identity()); - - return stream; - } -} |