diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java')
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java | 581 |
1 files changed, 581 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java new file mode 100644 index 000000000..be87c6bce --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java @@ -0,0 +1,581 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED; +import static org.osgi.util.pushstream.PushEvent.*; +import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR; +import static org.osgi.util.pushstream.QueuePolicyOption.FAIL; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * A factory for {@link PushStream} instances, and utility methods for handling + * {@link PushEventSource}s and {@link PushEventConsumer}s + */ +public final class PushStreamProvider { + + private final Lock lock = new ReentrantLock(true); + + private int schedulerReferences; + + private ScheduledExecutorService scheduler; + + private ScheduledExecutorService acquireScheduler() { + try { + lock.lockInterruptibly(); + try { + schedulerReferences += 1; + + if (schedulerReferences == 1) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + return scheduler; + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new IllegalStateException("Unable to acquire the Scheduler", + e); + } + } + + private void releaseScheduler() { + try { + lock.lockInterruptibly(); + try { + schedulerReferences -= 1; + + if (schedulerReferences == 0) { + scheduler.shutdown(); + scheduler = null; + } + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * Create a stream with the default configured buffer, executor size, queue, + * queue policy and pushback policy. This is equivalent to calling + * + * <code> + * buildStream(source).create(); + * </code> + * + * <p> + * This stream will be buffered from the event producer, and will honour + * back pressure even if the source does not. + * + * <p> + * Buffered streams are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm downstream processors. Buffering will not, + * however, protect downstream components from a source which produces + * events faster (on average) than they can be consumed. + * + * <p> + * Event delivery will not begin until a terminal operation is reached on + * the chain of AsyncStreams. Once a terminal operation is reached the + * stream will be connected to the event source. + * + * @param eventSource + * @return A {@link PushStream} with a default initial buffer + */ + public <T> PushStream<T> createStream(PushEventSource<T> eventSource) { + return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32), + FAIL.getPolicy(), LINEAR.getPolicy(1000)); + } + + /** + * Builds a push stream with custom configuration. + * + * <p> + * + * The resulting {@link PushStream} may be buffered or unbuffered depending + * on how it is configured. + * + * @param eventSource The source of the events + * + * @return A {@link PushStreamBuilder} for the stream + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream( + PushEventSource<T> eventSource) { + return new PushStreamBuilderImpl<T,U>(this, null, eventSource); + } + + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream( + PushEventSource<T> eventSource, int parallelism, Executor executor, + U queue, QueuePolicy<T,U> queuePolicy, + PushbackPolicy<T,U> pushbackPolicy) { + + if (eventSource == null) { + throw new NullPointerException("There is no source of events"); + } + + if (parallelism < 0) { + throw new IllegalArgumentException( + "The supplied parallelism cannot be less than zero. It was " + + parallelism); + } else if (parallelism == 0) { + parallelism = 1; + } + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(parallelism); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + if (queue == null) { + queue = (U) new ArrayBlockingQueue(32); + } + + if (queuePolicy == null) { + queuePolicy = FAIL.getPolicy(); + } + + if (pushbackPolicy == null) { + pushbackPolicy = LINEAR.getPolicy(1000); + } + + @SuppressWarnings("resource") + PushStream<T> stream = new BufferedPushStreamImpl<>(this, + acquireScheduler(), queue, parallelism, toUse, queuePolicy, + pushbackPolicy, aec -> { + try { + return eventSource.open(aec); + } catch (Exception e) { + throw new RuntimeException( + "Unable to connect to event source", e); + } + }); + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + return stream; + } + + <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource, + Executor executor) { + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + @SuppressWarnings("resource") + PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse, + acquireScheduler(), aec -> { + try { + return eventSource.open(aec); + } catch (Exception e) { + throw new RuntimeException( + "Unable to connect to event source", e); + } + }); + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + + return stream; + } + + /** + * Convert an {@link PushStream} into an {@link PushEventSource}. The first + * call to {@link PushEventSource#open(PushEventConsumer)} will begin event + * processing. + * + * The {@link PushEventSource} will remain active until the backing stream + * is closed, and permits multiple consumers to + * {@link PushEventSource#open(PushEventConsumer)} it. + * + * This is equivalent to: <code> + * buildEventSourceFromStream(stream).create(); + * </code> + * + * @param stream + * @return a {@link PushEventSource} backed by the {@link PushStream} + */ + public <T> PushEventSource<T> createEventSourceFromStream( + PushStream<T> stream) { + return buildEventSourceFromStream(stream).create(); + } + + /** + * Convert an {@link PushStream} into an {@link PushEventSource}. The first + * call to {@link PushEventSource#open(PushEventConsumer)} will begin event + * processing. + * + * The {@link PushEventSource} will remain active until the backing stream + * is closed, and permits multiple consumers to + * {@link PushEventSource#open(PushEventConsumer)} it. + * + * @param stream + * + * @return a {@link PushEventSource} backed by the {@link PushStream} + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream( + PushStream<T> stream) { + return new AbstractBufferBuilder<PushEventSource<T>,T,U>() { + @Override + public PushEventSource<T> create() { + SimplePushEventSource<T> spes = createSimplePushEventSource( + concurrency, worker, buffer, bufferingPolicy, () -> { + try { + stream.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + spes.connectPromise() + .then(p -> stream.forEach(t -> spes.publish(t)) + .onResolve(() -> spes.close())); + return spes; + } + }; + } + + + /** + * Create a {@link SimplePushEventSource} with the supplied type and default + * buffering behaviours. The SimplePushEventSource will respond to back + * pressure requests from the consumers connected to it. + * + * This is equivalent to: <code> + * buildSimpleEventSource(type).create(); + * </code> + * + * @param type + * @return a {@link SimplePushEventSource} + */ + public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) { + return createSimplePushEventSource(1, null, + new ArrayBlockingQueue<>(32), + FAIL.getPolicy(), () -> { /* Nothing else to do */ }); + } + + /** + * + * Build a {@link SimplePushEventSource} with the supplied type and custom + * buffering behaviours. The SimplePushEventSource will respond to back + * pressure requests from the consumers connected to it. + * + * @param type + * + * @return a {@link SimplePushEventSource} + */ + + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource( + Class<T> type) { + return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() { + @Override + public SimplePushEventSource<T> create() { + return createSimplePushEventSource(concurrency, worker, buffer, + bufferingPolicy, () -> { /* Nothing else to do */ }); + } + }; + } + + @SuppressWarnings({ + "unchecked", "rawtypes" + }) + <T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource( + int parallelism, Executor executor, U queue, + QueuePolicy<T,U> queuePolicy, Runnable onClose) { + + if (parallelism < 0) { + throw new IllegalArgumentException( + "The supplied parallelism cannot be less than zero. It was " + + parallelism); + } else if (parallelism == 0) { + parallelism = 1; + } + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + if (queue == null) { + queue = (U) new ArrayBlockingQueue(32); + } + + if (queuePolicy == null) { + queuePolicy = FAIL.getPolicy(); + } + + SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>( + toUse, acquireScheduler(), queuePolicy, queue, parallelism, + () -> { + try { + onClose.run(); + } catch (Exception e) { + // TODO log this? + } + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }); + return spes; + } + + /** + * Create a buffered {@link PushEventConsumer} with the default configured + * buffer, executor size, queue, queue policy and pushback policy. This is + * equivalent to calling + * + * <code> + * buildBufferedConsumer(delegate).create(); + * </code> + * + * <p> + * The returned consumer will be buffered from the event source, and will + * honour back pressure requests from its delegate even if the event source + * does not. + * + * <p> + * Buffered consumers are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm the consumer. Buffering will not, however, + * protect downstream components from a source which produces events faster + * than they can be consumed. + * + * @param delegate + * @return a {@link PushEventConsumer} with a buffer directly before it + */ + public <T> PushEventConsumer<T> createBufferedConsumer( + PushEventConsumer<T> delegate) { + return buildBufferedConsumer(delegate).create(); + } + + /** + * Build a buffered {@link PushEventConsumer} with custom configuration. + * <p> + * The returned consumer will be buffered from the event source, and will + * honour back pressure requests from its delegate even if the event source + * does not. + * <p> + * Buffered consumers are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm the consumer. Buffering will not, however, + * protect downstream components from a source which produces events faster + * than they can be consumed. + * <p> + * Buffers are also useful as "circuit breakers". If a + * {@link QueuePolicyOption#FAIL} is used then a full buffer will request + * that the stream close, preventing an event storm from reaching the + * client. + * <p> + * Note that this buffered consumer will close when it receives a terminal + * event, or if the delegate returns negative backpressure. No further + * events will be propagated after this time. + * + * @param delegate + * @return a {@link PushEventConsumer} with a buffer directly before it + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer( + PushEventConsumer<T> delegate) { + return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() { + @Override + public PushEventConsumer<T> create() { + PushEventPipe<T> pipe = new PushEventPipe<>(); + + createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure) + .forEachEvent(delegate); + + return pipe; + } + }; + } + + static final class PushEventPipe<T> + implements PushEventConsumer<T>, PushEventSource<T> { + + volatile PushEventConsumer< ? super T> delegate; + + @Override + public AutoCloseable open(PushEventConsumer< ? super T> pec) + throws Exception { + return () -> { /* Nothing else to do */ }; + } + + @Override + public long accept(PushEvent< ? extends T> event) throws Exception { + return delegate.accept(event); + } + + } + + /** + * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The + * data from the stream will be pushed into the PushStream synchronously as + * it is opened. This may make terminal operations blocking unless a buffer + * has been added to the {@link PushStream}. Care should be taken with + * infinite {@link Stream}s to avoid blocking indefinitely. + * + * @param items The items to push into the PushStream + * @return A PushStream containing the items from the Java Stream + */ + public <T> PushStream<T> streamOf(Stream<T> items) { + PushEventSource<T> pes = aec -> { + AtomicBoolean closed = new AtomicBoolean(false); + + items.mapToLong(i -> { + try { + long returnValue = closed.get() ? -1 : aec.accept(data(i)); + if (returnValue < 0) { + aec.accept(PushEvent.<T> close()); + } + return returnValue; + } catch (Exception e) { + try { + aec.accept(PushEvent.<T> error(e)); + } catch (Exception e2) {/* No further events needed */} + return -1; + } + }).filter(i -> i < 0).findFirst().orElseGet(() -> { + try { + return aec.accept(PushEvent.<T> close()); + } catch (Exception e) { + return -1; + } + }); + + return () -> closed.set(true); + }; + + return this.<T> createUnbufferedStream(pes, null); + } + + /** + * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The + * data from the stream will be pushed into the PushStream asynchronously + * using the supplied Executor. + * + * @param executor The worker to use to push items from the Stream into the + * PushStream + * @param items The items to push into the PushStream + * @return A PushStream containing the items from the Java Stream + */ + public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) { + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + @SuppressWarnings("resource") + PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>( + this, toUse, acquireScheduler(), aec -> { + return () -> { /* No action to take */ }; + }) { + + @Override + protected boolean begin() { + if (super.begin()) { + Iterator<T> it = items.iterator(); + + toUse.execute(() -> pushData(it)); + + return true; + } + return false; + } + + private void pushData(Iterator<T> it) { + while (it.hasNext()) { + try { + long returnValue = closed.get() == CLOSED ? -1 + : handleEvent(data(it.next())); + if (returnValue != 0) { + if (returnValue < 0) { + close(); + return; + } else { + scheduler.schedule( + () -> toUse.execute(() -> pushData(it)), + returnValue, MILLISECONDS); + return; + } + } + } catch (Exception e) { + close(error(e)); + } + } + close(); + } + }; + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + + return stream; + } +} |