Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java')
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java609
1 files changed, 609 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java
new file mode 100644
index 000000000..c26bc8c4d
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java
@@ -0,0 +1,609 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.TimeoutException;
+
+/**
+ * A Push Stream fulfills the same role as the Java 8 stream but it reverses the
+ * control direction. The Java 8 stream is pull based and this is push based. A
+ * Push Stream makes it possible to build a pipeline of transformations using a
+ * builder kind of model. Just like streams, it provides a number of terminating
+ * methods that will actually open the channel and perform the processing until
+ * the channel is closed (The source sends a Close event). The results of the
+ * processing will be send to a Promise, just like any error events. A stream
+ * can be used multiple times. The Push Stream represents a pipeline. Upstream
+ * is in the direction of the source, downstream is in the direction of the
+ * terminating method. Events are sent downstream asynchronously with no
+ * guarantee for ordering or concurrency. Methods are available to provide
+ * serialization of the events and splitting in background threads.
+ *
+ * @param <T> The Payload type
+ */
+@ProviderType
+public interface PushStream<T> extends AutoCloseable {
+
+ /**
+ * Must be run after the channel is closed. This handler will run after the
+ * downstream methods have processed the close event and before the upstream
+ * methods have closed.
+ *
+ * @param closeHandler Will be called on close
+ * @return This stream
+ */
+ PushStream<T> onClose(Runnable closeHandler);
+
+ /**
+ * Must be run after the channel is closed. This handler will run after the
+ * downstream methods have processed the close event and before the upstream
+ * methods have closed.
+ *
+ * @param closeHandler Will be called on close
+ * @return This stream
+ */
+ PushStream<T> onError(Consumer< ? super Throwable> closeHandler);
+
+ /**
+ * Only pass events downstream when the predicate tests true.
+ *
+ * @param predicate The predicate that is tested (not null)
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> filter(Predicate< ? super T> predicate);
+
+ /**
+ * Map a payload value.
+ *
+ * @param mapper The map function
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
+
+ /**
+ * Flat map the payload value (turn one event into 0..n events of
+ * potentially another type).
+ *
+ * @param mapper The flat map function
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> flatMap(
+ Function< ? super T, ? extends PushStream< ? extends R>> mapper);
+
+ /**
+ * Remove any duplicates. Notice that this can be expensive in a large
+ * stream since it must track previous payloads.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> distinct();
+
+ /**
+ * Sorted the elements, assuming that T extends Comparable. This is of
+ * course expensive for large or infinite streams since it requires
+ * buffering the stream until close.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> sorted();
+
+ /**
+ * Sorted the elements with the given comparator. This is of course
+ * expensive for large or infinite streams since it requires buffering the
+ * stream until close.
+ *
+ * @param comparator
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> sorted(Comparator< ? super T> comparator);
+
+ /**
+ * Automatically close the channel after the maxSize number of elements is
+ * received.
+ *
+ * @param maxSize Maximum number of elements has been received
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> limit(long maxSize);
+
+ /**
+ * Automatically close the channel after the given amount of time has
+ * elapsed.
+ *
+ * @param maxTime The maximum time that the stream should remain open
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> limit(Duration maxTime);
+
+ /**
+ * Automatically fail the channel if no events are received for the
+ * indicated length of time. If the timeout is reached then a failure event
+ * containing a {@link TimeoutException} will be sent.
+ *
+ * @param idleTime The length of time that the stream should remain open
+ * when no events are being received.
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> timeout(Duration idleTime);
+
+ /**
+ * Skip a number of events in the channel.
+ *
+ * @param n number of elements to skip
+ * @throws IllegalArgumentException if the number of events to skip is
+ * negative
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> skip(long n);
+
+ /**
+ * Execute the downstream events in up to n background threads. If more
+ * requests are outstanding apply delay * nr of delayed threads back
+ * pressure. A downstream channel that is closed or throws an exception will
+ * cause all execution to cease and the stream to close
+ *
+ * @param n number of simultaneous background threads to use
+ * @param delay Nr of ms/thread that is queued back pressure
+ * @param e an executor to use for the background threads.
+ * @return Builder style (can be a new or the same object)
+ * @throws IllegalArgumentException if the number of threads is < 1 or the
+ * delay is < 0
+ * @throws NullPointerException if the Executor is null
+ */
+ PushStream<T> fork(int n, int delay, Executor e)
+ throws IllegalArgumentException, NullPointerException;
+
+ /**
+ * Buffer the events in a queue using default values for the queue size and
+ * other behaviours. Buffered work will be processed asynchronously in the
+ * rest of the chain. Buffering also blocks the transmission of back
+ * pressure to previous elements in the chain, although back pressure is
+ * honoured by the buffer.
+ * <p>
+ * Buffers are useful for "bursty" event sources which produce a number of
+ * events close together, then none for some time. These bursts can
+ * sometimes overwhelm downstream event consumers. Buffering will not,
+ * however, protect downstream components from a source which produces
+ * events faster than they can be consumed. For fast sources
+ * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+ * {@link #fork(int, int, Executor)} are better choices.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> buffer();
+
+ /**
+ * Build a buffer to enqueue events in a queue using custom values for the
+ * queue size and other behaviours. Buffered work will be processed
+ * asynchronously in the rest of the chain. Buffering also blocks the
+ * transmission of back pressure to previous elements in the chain, although
+ * back pressure is honoured by the buffer.
+ * <p>
+ * Buffers are useful for "bursty" event sources which produce a number of
+ * events close together, then none for some time. These bursts can
+ * sometimes overwhelm downstream event consumers. Buffering will not,
+ * however, protect downstream components from a source which produces
+ * events faster than they can be consumed. For fast sources
+ * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+ * {@link #fork(int, int, Executor)} are better choices.
+ * <p>
+ * Buffers are also useful as "circuit breakers" in the pipeline. If a
+ * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
+ * the stream to close, preventing an event storm from reaching the client.
+ *
+ * @param parallelism
+ * @param executor
+ * @param queue
+ * @param queuePolicy
+ * @param pushbackPolicy
+ * @return Builder style (can be a new or the same object)
+ */
+ <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
+
+ /**
+ * Merge in the events from another source. The resulting channel is not
+ * closed until this channel and the channel from the source are closed.
+ *
+ * @param source The source to merge in.
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> merge(PushEventSource< ? extends T> source);
+
+ /**
+ * Merge in the events from another PushStream. The resulting channel is not
+ * closed until this channel and the channel from the source are closed.
+ *
+ * @param source The source to merge in.
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> merge(PushStream< ? extends T> source);
+
+ /**
+ * Split the events to different streams based on a predicate. If the
+ * predicate is true, the event is dispatched to that channel on the same
+ * position. All predicates are tested for every event.
+ * <p>
+ * This method differs from other methods of AsyncStream in three
+ * significant ways:
+ * <ul>
+ * <li>The return value contains multiple streams.</li>
+ * <li>This stream will only close when all of these child streams have
+ * closed.</li>
+ * <li>Event delivery is made to all open children that accept the event.
+ * </li>
+ * </ul>
+ *
+ * @param predicates the predicates to test
+ * @return streams that map to the predicates
+ */
+ @SuppressWarnings("unchecked")
+ PushStream<T>[] split(Predicate< ? super T>... predicates);
+
+ /**
+ * Ensure that any events are delivered sequentially. That is, no
+ * overlapping calls downstream. This can be used to turn a forked stream
+ * (where for example a heavy conversion is done in multiple threads) back
+ * into a sequential stream so a reduce is simple to do.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> sequential();
+
+ /**
+ * Coalesces a number of events into a new type of event. The input events
+ * are forwarded to a accumulator function. This function returns an
+ * Optional. If the optional is present, it's value is send downstream,
+ * otherwise it is ignored.
+ *
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f);
+
+ /**
+ * Coalesces a number of events into a new type of event. A fixed number of
+ * input events are forwarded to a accumulator function. This function
+ * returns new event data to be forwarded on.
+ *
+ * @param count
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f);
+
+ /**
+ * Coalesces a number of events into a new type of event. A variable number
+ * of input events are forwarded to a accumulator function. The number of
+ * events to be forwarded is determined by calling the count function. The
+ * accumulator function then returns new event data to be forwarded on.
+ *
+ * @param count
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ public <R> PushStream<R> coalesce(IntSupplier count,
+ Function<Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a fixed time interval and then forwards
+ * the events to an accumulator function. This function returns new event
+ * data to be forwarded on. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered as a different task, (and therefore potentially on a different
+ * thread) from the one that delivered the event to this {@link PushStream}.
+ * </li>
+ * <li>Due to the buffering and asynchronous delivery required, this method
+ * prevents the propagation of back-pressure to earlier stages</li>
+ * </ul>
+ *
+ * @param d
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a fixed time interval and then forwards
+ * the events to an accumulator function. This function returns new event
+ * data to be forwarded on. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered by a task given to the supplied executor.</li>
+ * <li>Due to the buffering and asynchronous delivery required, this method
+ * prevents the propagation of back-pressure to earlier stages</li>
+ * </ul>
+ *
+ * @param d
+ * @param executor
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Duration d, Executor executor,
+ Function<Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a variable time interval and then
+ * forwards the events to an accumulator function. The length of time over
+ * which events are buffered is determined by the time function. A maximum
+ * number of events can also be requested, if this number of events is
+ * reached then the accumulator will be called early. The accumulator
+ * function returns new event data to be forwarded on. It is also given the
+ * length of time for which the buffer accumulated data. This may be less
+ * than the requested interval if the buffer reached the maximum number of
+ * requested events early. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered as a different task, (and therefore potentially on a different
+ * thread) from the one that delivered the event to this {@link PushStream}.
+ * </li>
+ * <li>Due to the buffering and asynchronous delivery required, this method
+ * prevents the propagation of back-pressure to earlier stages</li>
+ * <li>If the window finishes by hitting the maximum number of events then
+ * the remaining time in the window will be applied as back-pressure to the
+ * previous stage, attempting to slow the producer to the expected windowing
+ * threshold.</li>
+ * </ul>
+ *
+ * @param timeSupplier
+ * @param maxEvents
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Supplier<Duration> timeSupplier,
+ IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a variable time interval and then
+ * forwards the events to an accumulator function. The length of time over
+ * which events are buffered is determined by the time function. A maximum
+ * number of events can also be requested, if this number of events is
+ * reached then the accumulator will be called early. The accumulator
+ * function returns new event data to be forwarded on. It is also given the
+ * length of time for which the buffer accumulated data. This may be less
+ * than the requested interval if the buffer reached the maximum number of
+ * requested events early. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered as a different task, (and therefore potentially on a different
+ * thread) from the one that delivered the event to this {@link PushStream}.
+ * </li>
+ * <li>If the window finishes by hitting the maximum number of events then
+ * the remaining time in the window will be applied as back-pressure to the
+ * previous stage, attempting to slow the producer to the expected windowing
+ * threshold.</li>
+ * </ul>
+ *
+ * @param timeSupplier
+ * @param maxEvents
+ * @param executor
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Supplier<Duration> timeSupplier,
+ IntSupplier maxEvents, Executor executor,
+ BiFunction<Long,Collection<T>,R> f);
+
+ /**
+ * Execute the action for each event received until the channel is closed.
+ * This is a terminating method, the returned promise is resolved when the
+ * channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param action The action to perform
+ * @return A promise that is resolved when the channel closes.
+ */
+ Promise<Void> forEach(Consumer< ? super T> action);
+
+ /**
+ * Collect the payloads in an Object array after the channel is closed. This
+ * is a terminating method, the returned promise is resolved when the
+ * channel is closed.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @return A promise that is resolved with all the payloads received over
+ * the channel
+ */
+ Promise<Object[]> toArray();
+
+ /**
+ * Collect the payloads in an Object array after the channel is closed. This
+ * is a terminating method, the returned promise is resolved when the
+ * channel is closed. The type of the array is handled by the caller using a
+ * generator function that gets the length of the desired array.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param generator
+ * @return A promise that is resolved with all the payloads received over
+ * the channel
+ */
+ <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator);
+
+ /**
+ * Standard reduce, see Stream. The returned promise will be resolved when
+ * the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param identity The identity/begin value
+ * @param accumulator The accumulator
+ * @return A
+ */
+ Promise<T> reduce(T identity, BinaryOperator<T> accumulator);
+
+ /**
+ * Standard reduce without identity, so the return is an Optional. The
+ * returned promise will be resolved when the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param accumulator The accumulator
+ * @return an Optional
+ */
+ Promise<Optional<T>> reduce(BinaryOperator<T> accumulator);
+
+ /**
+ * Standard reduce with identity, accumulator and combiner. The returned
+ * promise will be resolved when the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param identity
+ * @param accumulator
+ * @param combiner combines to U's into one U (e.g. how combine two lists)
+ * @return The promise
+ */
+ <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator,
+ BinaryOperator<U> combiner);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param collector
+ * @return A Promise representing the collected results
+ */
+ <R, A> Promise<R> collect(Collector< ? super T,A,R> collector);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param comparator
+ * @return A Promise representing the minimum value, or null if no values
+ * are seen before the end of the stream
+ */
+ Promise<Optional<T>> min(Comparator< ? super T> comparator);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param comparator
+ * @return A Promise representing the maximum value, or null if no values
+ * are seen before the end of the stream
+ */
+ Promise<Optional<T>> max(Comparator< ? super T> comparator);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @return A Promise representing the number of values in the stream
+ */
+ Promise<Long> count();
+
+ /**
+ * Close the channel and resolve the promise with true when the predicate
+ * matches a payload. If the channel is closed before the predicate matches,
+ * the promise is resolved with false.
+ * <p>
+ * This is a <strong>short circuiting terminal operation</strong>
+ *
+ * @param predicate
+ * @return A Promise that will resolve when an event matches the predicate,
+ * or the end of the stream is reached
+ */
+ Promise<Boolean> anyMatch(Predicate< ? super T> predicate);
+
+ /**
+ * Closes the channel and resolve the promise with false when the predicate
+ * does not matches a pay load.If the channel is closed before, the promise
+ * is resolved with true.
+ * <p>
+ * This is a <strong>short circuiting terminal operation</strong>
+ *
+ * @param predicate
+ * @return A Promise that will resolve when an event fails to match the
+ * predicate, or the end of the stream is reached
+ */
+ Promise<Boolean> allMatch(Predicate< ? super T> predicate);
+
+ /**
+ * Closes the channel and resolve the promise with false when the predicate
+ * matches any pay load. If the channel is closed before, the promise is
+ * resolved with true.
+ * <p>
+ * This is a <strong>short circuiting terminal operation</strong>
+ *
+ * @param predicate
+ * @return A Promise that will resolve when an event matches the predicate,
+ * or the end of the stream is reached
+ */
+ Promise<Boolean> noneMatch(Predicate< ? super T> predicate);
+
+ /**
+ * Close the channel and resolve the promise with the first element. If the
+ * channel is closed before, the Optional will have no value.
+ *
+ * @return a promise
+ */
+ Promise<Optional<T>> findFirst();
+
+ /**
+ * Close the channel and resolve the promise with the first element. If the
+ * channel is closed before, the Optional will have no value.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @return a promise
+ */
+ Promise<Optional<T>> findAny();
+
+ /**
+ * Pass on each event to another consumer until the stream is closed.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param action
+ * @return a promise
+ */
+ Promise<Long> forEachEvent(PushEventConsumer< ? super T> action);
+
+}

Back to the top