Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Watson2017-04-28 18:31:25 +0000
committerThomas Watson2017-06-16 12:38:08 +0000
commitb0b632a4b575f3d996560ae5915370025d18234d (patch)
tree2bf99e7fa8146b5247356a195ac7510ec92c8ab9
parent9c940504036225a9c7d3f869f6d1ca0f55083e50 (diff)
downloadrt.equinox.framework-b0b632a4b575f3d996560ae5915370025d18234d.tar.gz
rt.equinox.framework-b0b632a4b575f3d996560ae5915370025d18234d.tar.xz
rt.equinox.framework-b0b632a4b575f3d996560ae5915370025d18234d.zip
Add pushstream API and fix import ranges
Change-Id: I2bd80833a975650125ee2bf69900d2f99de4f0ce Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r--bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF11
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java60
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java1480
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java79
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java111
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java35
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java205
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java69
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java50
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java609
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java52
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java88
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java581
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java48
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java98
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java52
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java76
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java104
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java337
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java73
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java39
21 files changed, 4252 insertions, 5 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
index 3568239b1..74f3a73cc 100644
--- a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
+++ b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
@@ -5,9 +5,10 @@ Bundle-SymbolicName: org.eclipse.equinox.log.stream
Bundle-Version: 1.0.0.qualifier
Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamFactoryImpl
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
-Import-Package: org.osgi.framework;version="1.3.0",
- org.osgi.service.log;version="1.4.0",
- org.osgi.util.promise;version="1.0.0",
- org.osgi.util.pushstream;version="1.0.0",
- org.osgi.util.tracker;version="1.5.0"
+Import-Package: org.osgi.framework;version="[1.9.0,2.0.0)",
+ org.osgi.service.log;version="[1.4.0,2.0.0)",
+ org.osgi.util.promise;version="[1.0.0,2.0.0)",
+ org.osgi.util.tracker;version="[1.5.0,2.0.0)"
Bundle-ActivationPolicy: lazy
+Export-Package: org.osgi.service.log.stream;version="1.0.0",
+ org.osgi.util.pushstream;version="1.0.0"
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
new file mode 100644
index 000000000..a37e407fd
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
@@ -0,0 +1,60 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ implements BufferBuilder<R,T,U> {
+
+ protected Executor worker;
+ protected int concurrency;
+ protected PushbackPolicy<T,U> backPressure;
+ protected QueuePolicy<T,U> bufferingPolicy;
+ protected U buffer;
+
+ @Override
+ public BufferBuilder<R,T,U> withBuffer(U queue) {
+ this.buffer = queue;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withQueuePolicy(
+ QueuePolicy<T,U> queuePolicy) {
+ this.bufferingPolicy = queuePolicy;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withQueuePolicy(
+ QueuePolicyOption queuePolicyOption) {
+ this.bufferingPolicy = queuePolicyOption.getPolicy();
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withPushbackPolicy(
+ PushbackPolicy<T,U> pushbackPolicy) {
+ this.backPressure = pushbackPolicy;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withPushbackPolicy(
+ PushbackPolicyOption pushbackPolicyOption, long time) {
+ this.backPressure = pushbackPolicyOption.getPolicy(time);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withParallelism(int parallelism) {
+ this.concurrency = parallelism;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withExecutor(Executor executor) {
+ this.worker = executor;
+ return this;
+ }
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
new file mode 100644
index 000000000..2293c1aad
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -0,0 +1,1480 @@
+package org.osgi.util.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+import static org.osgi.util.pushstream.PushEventConsumer.*;
+
+import java.time.Duration;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.TimeoutException;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+
+ static enum State {
+ BUILDING, STARTED, CLOSED
+ }
+
+ protected final PushStreamProvider psp;
+
+ protected final Executor defaultExecutor;
+ protected final ScheduledExecutorService scheduler;
+
+ protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
+
+ protected final AtomicReference<PushEventConsumer<T>> next = new AtomicReference<>();
+
+ protected final AtomicReference<Runnable> onCloseCallback = new AtomicReference<>();
+ protected final AtomicReference<Consumer<? super Throwable>> onErrorCallback = new AtomicReference<>();
+
+ protected abstract boolean begin();
+
+ AbstractPushStreamImpl(PushStreamProvider psp,
+ Executor executor, ScheduledExecutorService scheduler) {
+ this.psp = psp;
+ this.defaultExecutor = executor;
+ this.scheduler = scheduler;
+ }
+
+ protected long handleEvent(PushEvent< ? extends T> event) {
+ if(closed.get() != CLOSED) {
+ try {
+ if(event.isTerminal()) {
+ close(event.nodata());
+ return ABORT;
+ } else {
+ PushEventConsumer<T> consumer = next.get();
+ long val;
+ if(consumer == null) {
+ //TODO log a warning
+ val = CONTINUE;
+ } else {
+ val = consumer.accept(event);
+ }
+ if(val < 0) {
+ close();
+ }
+ return val;
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ }
+ return ABORT;
+ }
+
+ @Override
+ public void close() {
+ close(PushEvent.close());
+ }
+
+ protected boolean close(PushEvent<T> event) {
+ if(!event.isTerminal()) {
+ throw new IllegalArgumentException("The event " + event + " is not a close event.");
+ }
+ if(closed.getAndSet(CLOSED) != CLOSED) {
+ PushEventConsumer<T> aec = next.getAndSet(null);
+ if(aec != null) {
+ try {
+ aec.accept(event);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ Runnable handler = onCloseCallback.getAndSet(null);
+ if(handler != null) {
+ try {
+ handler.run();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ if (event.getType() == EventType.ERROR) {
+ Consumer<? super Throwable> errorHandler = onErrorCallback.getAndSet(null);
+ if(errorHandler != null) {
+ try {
+ errorHandler.accept(event.getFailure());
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public PushStream<T> onClose(Runnable closeHandler) {
+ if(onCloseCallback.compareAndSet(null, closeHandler)) {
+ if(closed.get() == State.CLOSED && onCloseCallback.compareAndSet(closeHandler, null)) {
+ closeHandler.run();
+ }
+ } else {
+ throw new IllegalStateException("A close handler has already been defined for this stream object");
+ }
+ return this;
+ }
+
+ @Override
+ public PushStream<T> onError(Consumer< ? super Throwable> closeHandler) {
+ if(onErrorCallback.compareAndSet(null, closeHandler)) {
+ if(closed.get() == State.CLOSED) {
+ //TODO log already closed
+ onErrorCallback.set(null);
+ }
+ } else {
+ throw new IllegalStateException("A close handler has already been defined for this stream object");
+ }
+ return this;
+ }
+
+ private void updateNext(PushEventConsumer<T> consumer) {
+ if(!next.compareAndSet(null, consumer)) {
+ throw new IllegalStateException("This stream has already been chained");
+ } else if(closed.get() == CLOSED && next.compareAndSet(consumer, null)) {
+ try {
+ consumer.accept(PushEvent.close());
+ } catch (Exception e) {
+ //TODO log
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public PushStream<T> filter(Predicate< ? super T> predicate) {
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext((event) -> {
+ try {
+ if (!event.isTerminal()) {
+ if (predicate.test(event.getData())) {
+ return eventStream.handleEvent(event);
+ } else {
+ return CONTINUE;
+ }
+ }
+ return eventStream.handleEvent(event);
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
+
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ return eventStream.handleEvent(
+ PushEvent.data(mapper.apply(event.getData())));
+ } else {
+ return eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> flatMap(
+ Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+
+ PushEventConsumer<R> consumer = e -> {
+ switch (e.getType()) {
+ case ERROR :
+ close(e.nodata());
+ return ABORT;
+ case CLOSE :
+ // Close should allow the next flat mapped entry
+ // without closing the stream;
+ return ABORT;
+ case DATA :
+ long returnValue = eventStream.handleEvent(e);
+ if (returnValue < 0) {
+ close();
+ return ABORT;
+ }
+ return returnValue;
+ default :
+ throw new IllegalArgumentException(
+ "The event type " + e.getType() + " is unknown");
+ }
+ };
+
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ PushStream< ? extends R> mappedStream = mapper
+ .apply(event.getData());
+
+ return mappedStream.forEachEvent(consumer)
+ .getValue()
+ .longValue();
+ } else {
+ return eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> distinct() {
+ Set<T> set = Collections.<T>newSetFromMap(new ConcurrentHashMap<>());
+ return filter(set::add);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public PushStream<T> sorted() {
+ return sorted((Comparator)Comparator.naturalOrder());
+ }
+
+ @Override
+ public PushStream<T> sorted(Comparator< ? super T> comparator) {
+ List<T> list = Collections.synchronizedList(new ArrayList<>());
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA :
+ list.add(event.getData());
+ return CONTINUE;
+ case CLOSE :
+ list.sort(comparator);
+ sorted: for (T t : list) {
+ if (eventStream
+ .handleEvent(PushEvent.data(t)) < 0) {
+ break sorted;
+ }
+ }
+ // Fall through
+ case ERROR :
+ eventStream.handleEvent(event);
+ return ABORT;
+ }
+ return eventStream.handleEvent(event.nodata());
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> limit(long maxSize) {
+ if(maxSize <= 0) {
+ throw new IllegalArgumentException("The limit must be greater than zero");
+ }
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicLong counter = new AtomicLong(maxSize);
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ long count = counter.decrementAndGet();
+ if (count > 0) {
+ return eventStream.handleEvent(event);
+ } else if (count == 0) {
+ eventStream.handleEvent(event);
+ }
+ return ABORT;
+ } else {
+ return eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> limit(Duration maxTime) {
+
+ Runnable start = () -> scheduler.schedule(() -> close(),
+ maxTime.toNanos(), NANOSECONDS);
+
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
+ psp, defaultExecutor, scheduler, this) {
+ @Override
+ protected void beginning() {
+ start.run();
+ }
+ };
+ updateNext((event) -> {
+ try {
+ return eventStream.handleEvent(event);
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> timeout(Duration maxTime) {
+
+ AtomicLong lastTime = new AtomicLong();
+ long timeout = maxTime.toNanos();
+
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
+ psp, defaultExecutor, scheduler, this) {
+ @Override
+ protected void beginning() {
+ lastTime.set(System.nanoTime());
+ scheduler.schedule(() -> check(lastTime, timeout), timeout,
+ NANOSECONDS);
+ }
+ };
+ updateNext((event) -> {
+ try {
+ return eventStream.handleEvent(event);
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ void check(AtomicLong lastTime, long timeout) {
+ long now = System.nanoTime();
+
+ long elapsed = now - lastTime.get();
+
+ if (elapsed < timeout) {
+ scheduler.schedule(() -> check(lastTime, timeout),
+ timeout - elapsed, NANOSECONDS);
+ } else {
+ close(PushEvent.error(new TimeoutException()));
+ }
+ }
+
+ @Override
+ public PushStream<T> skip(long n) {
+ if (n < 0) {
+ throw new IllegalArgumentException(
+ "The number to skip must be greater than or equal to zero");
+ }
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicLong counter = new AtomicLong(n);
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ if (counter.get() > 0 && counter.decrementAndGet() >= 0) {
+ return CONTINUE;
+ } else {
+ return eventStream.handleEvent(event);
+ }
+ } else {
+ return eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> fork(int n, int delay, Executor ex) {
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, ex, scheduler, this);
+ Semaphore s = new Semaphore(n);
+ updateNext(event -> {
+ try {
+ if (event.isTerminal()) {
+ s.acquire(n);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+
+ s.acquire(1);
+
+ ex.execute(() -> {
+ try {
+ if (eventStream.handleEvent(event) < 0) {
+ eventStream.close(PushEvent.close());
+ }
+ } catch (Exception e1) {
+ close(PushEvent.error(e1));
+ } finally {
+ s.release(1);
+ }
+ });
+
+ return s.getQueueLength() * delay;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> buffer() {
+ return psp.createStream(c -> {
+ forEachEvent(c);
+ return this;
+ });
+ }
+
+ @Override
+ public <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer() {
+ return psp.buildStream(c -> {
+ forEachEvent(c);
+ return this;
+ });
+ }
+
+ @Override
+ public PushStream<T> merge(
+ PushEventSource< ? extends T> source) {
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicInteger count = new AtomicInteger(2);
+ PushEventConsumer<T> consumer = event -> {
+ try {
+ if (!event.isTerminal()) {
+ return eventStream.handleEvent(event);
+ }
+
+ if (count.decrementAndGet() == 0) {
+ eventStream.handleEvent(event.nodata());
+ return ABORT;
+ }
+ return CONTINUE;
+ } catch (Exception e) {
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+ };
+ updateNext(consumer);
+ AutoCloseable second;
+ try {
+ second = source.open((PushEvent< ? extends T> event) -> {
+ return consumer.accept(event);
+ });
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new IllegalStateException(
+ "Unable to merge events as the event source could not be opened.",
+ e);
+ }
+
+ return eventStream.onClose(() -> {
+ try {
+ second.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }).map(Function.identity());
+ }
+
+ @Override
+ public PushStream<T> merge(PushStream< ? extends T> source) {
+
+ AtomicInteger count = new AtomicInteger(2);
+ Consumer<AbstractPushStreamImpl<T>> start = downstream -> {
+ PushEventConsumer<T> consumer = e -> {
+ long toReturn;
+ try {
+ if (!e.isTerminal()) {
+ toReturn = downstream.handleEvent(e);
+ } else if (count.decrementAndGet() == 0) {
+ downstream.handleEvent(e);
+ toReturn = ABORT;
+ } else {
+ return ABORT;
+ }
+ } catch (Exception ex) {
+ try {
+ downstream.handleEvent(PushEvent.error(ex));
+ } catch (Exception ex2) { /* Just ignore this */}
+ toReturn = ABORT;
+ }
+ if (toReturn < 0) {
+ try {
+ close();
+ } catch (Exception ex2) { /* Just ignore this */}
+ try {
+ source.close();
+ } catch (Exception ex2) { /* Just ignore this */}
+ }
+ return toReturn;
+ };
+ forEachEvent(consumer);
+ source.forEachEvent(consumer);
+ };
+
+ @SuppressWarnings("resource")
+ AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
+ psp, defaultExecutor, scheduler) {
+ @Override
+ protected boolean begin() {
+ if (closed.compareAndSet(BUILDING, STARTED)) {
+ start.accept(this);
+ return true;
+ }
+ return false;
+ }
+ };
+
+
+ return eventStream.onClose(() -> {
+ try {
+ close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ try {
+ source.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }).map(Function.identity());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public PushStream<T>[] split(Predicate< ? super T>... predicates) {
+ Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
+ AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
+ for(int i = 0; i < tests.length; i++) {
+ rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor,
+ scheduler, this);
+ }
+
+ Boolean[] array = new Boolean[tests.length];
+ Arrays.fill(array, Boolean.TRUE);
+ AtomicReferenceArray<Boolean> off = new AtomicReferenceArray<>(array);
+
+ AtomicInteger count = new AtomicInteger(tests.length);
+ updateNext(event -> {
+ if (!event.isTerminal()) {
+ long delay = CONTINUE;
+ for (int i = 0; i < tests.length; i++) {
+ try {
+ if (off.get(i).booleanValue()
+ && tests[i].test(event.getData())) {
+ long accept = rsult[i].handleEvent(event);
+ if (accept < 0) {
+ off.set(i, Boolean.TRUE);
+ count.decrementAndGet();
+ } else if (accept > delay) {
+ accept = delay;
+ }
+ }
+ } catch (Exception e) {
+ try {
+ rsult[i].close(PushEvent.error(e));
+ } catch (Exception e2) {
+ //TODO log
+ }
+ off.set(i, Boolean.TRUE);
+ }
+ }
+ if (count.get() == 0)
+ return ABORT;
+
+ return delay;
+ }
+ for (AbstractPushStreamImpl<T> as : rsult) {
+ try {
+ as.handleEvent(event.nodata());
+ } catch (Exception e) {
+ try {
+ as.close(PushEvent.error(e));
+ } catch (Exception e2) {
+ //TODO log
+ }
+ }
+ }
+ return ABORT;
+ });
+ return Arrays.copyOf(rsult, tests.length);
+ }
+
+ @Override
+ public PushStream<T> sequential() {
+ AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ Lock lock = new ReentrantLock();
+ updateNext((event) -> {
+ try {
+ lock.lock();
+ try {
+ return eventStream.handleEvent(event);
+ } finally {
+ lock.unlock();
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> coalesce(
+ Function< ? super T,Optional<R>> accumulator) {
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext((event) -> {
+ try {
+ if (!event.isTerminal()) {
+ Optional<PushEvent<R>> coalesced = accumulator
+ .apply(event.getData()).map(PushEvent::data);
+ if (coalesced.isPresent()) {
+ try {
+ return eventStream.handleEvent(coalesced.get());
+ } catch (Exception ex) {
+ close(PushEvent.error(ex));
+ return ABORT;
+ }
+ } else {
+ return CONTINUE;
+ }
+ }
+ return eventStream.handleEvent(event.nodata());
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f) {
+ if (count <= 0)
+ throw new IllegalArgumentException(
+ "A coalesce operation must collect a positive number of events");
+ // This could be optimised to only use a single collection queue.
+ // It would save some GC, but is it worth it?
+ return coalesce(() -> count, f);
+ }
+
+ @Override
+ public <R> PushStream<R> coalesce(IntSupplier count,
+ Function<Collection<T>,R> f) {
+ AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
+ null);
+
+ Runnable init = () -> queueRef
+ .set(getQueueForInternalBuffering(count.getAsInt()));
+
+ @SuppressWarnings("resource")
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
+ psp, defaultExecutor, scheduler, this) {
+ @Override
+ protected void beginning() {
+ init.run();
+ }
+ };
+
+ AtomicBoolean endPending = new AtomicBoolean();
+ Object lock = new Object();
+ updateNext((event) -> {
+ try {
+ Queue<T> queue;
+ if (!event.isTerminal()) {
+ synchronized (lock) {
+ for (;;) {
+ queue = queueRef.get();
+ if (queue == null) {
+ if (endPending.get()) {
+ return ABORT;
+ } else {
+ continue;
+ }
+ } else if (queue.offer(event.getData())) {
+ return CONTINUE;
+ } else {
+ queueRef.lazySet(null);
+ break;
+ }
+ }
+ }
+
+ queueRef.set(
+ getQueueForInternalBuffering(count.getAsInt()));
+
+ // This call is on the same thread and so must happen
+ // outside
+ // the synchronized block.
+ return aggregateAndForward(f, eventStream, event,
+ queue);
+ } else {
+ synchronized (lock) {
+ queue = queueRef.get();
+ queueRef.lazySet(null);
+ endPending.set(true);
+ }
+ if (queue != null) {
+ eventStream.handleEvent(
+ PushEvent.data(f.apply(queue)));
+ }
+ }
+ return eventStream.handleEvent(event.nodata());
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ private <R> long aggregateAndForward(Function<Collection<T>,R> f,
+ AbstractPushStreamImpl<R> eventStream,
+ PushEvent< ? extends T> event, Queue<T> queue) {
+ if (!queue.offer(event.getData())) {
+ ((ArrayQueue<T>) queue).forcePush(event.getData());
+ }
+ return eventStream.handleEvent(PushEvent.data(f.apply(queue)));
+ }
+
+
+ @Override
+ public <R> PushStream<R> window(Duration time,
+ Function<Collection<T>,R> f) {
+ return window(time, defaultExecutor, f);
+ }
+
+ @Override
+ public <R> PushStream<R> window(Duration time, Executor executor,
+ Function<Collection<T>,R> f) {
+ return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
+ }
+
+ @Override
+ public <R> PushStream<R> window(Supplier<Duration> time,
+ IntSupplier maxEvents,
+ BiFunction<Long,Collection<T>,R> f) {
+ return window(time, maxEvents, defaultExecutor, f);
+ }
+
+ @Override
+ public <R> PushStream<R> window(Supplier<Duration> time,
+ IntSupplier maxEvents, Executor ex,
+ BiFunction<Long,Collection<T>,R> f) {
+
+ AtomicLong timestamp = new AtomicLong();
+ AtomicLong previousWindowSize = new AtomicLong();
+ AtomicLong counter = new AtomicLong();
+ Object lock = new Object();
+ AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
+ null);
+
+ // This code is declared as a separate block to avoid any confusion
+ // about which instance's methods and variables are in scope
+ Consumer<AbstractPushStreamImpl<R>> begin = p -> {
+
+ synchronized (lock) {
+ timestamp.lazySet(System.nanoTime());
+ long count = counter.get();
+
+
+ long windowSize = time.get().toNanos();
+ previousWindowSize.set(windowSize);
+ scheduler.schedule(
+ getWindowTask(p, f, time, maxEvents, lock, count,
+ queueRef, timestamp, counter,
+ previousWindowSize, ex),
+ windowSize, NANOSECONDS);
+ }
+
+ queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+ };
+
+ @SuppressWarnings("resource")
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
+ psp, ex, scheduler, this) {
+ @Override
+ protected void beginning() {
+ begin.accept(this);
+ }
+ };
+
+ AtomicBoolean endPending = new AtomicBoolean(false);
+ updateNext((event) -> {
+ try {
+ if (eventStream.closed.get() == CLOSED) {
+ return ABORT;
+ }
+ Queue<T> queue;
+ if (!event.isTerminal()) {
+ long elapsed;
+ long newCount;
+ synchronized (lock) {
+ for (;;) {
+ queue = queueRef.get();
+ if (queue == null) {
+ if (endPending.get()) {
+ return ABORT;
+ } else {
+ continue;
+ }
+ } else if (queue.offer(event.getData())) {
+ return CONTINUE;
+ } else {
+ queueRef.lazySet(null);
+ break;
+ }
+ }
+
+ long now = System.nanoTime();
+ elapsed = now - timestamp.get();
+ timestamp.lazySet(now);
+ newCount = counter.get() + 1;
+ counter.lazySet(newCount);
+
+ // This is a non-blocking call, and must happen in the
+ // synchronized block to avoid re=ordering the executor
+ // enqueue with a subsequent incoming close operation
+ aggregateAndForward(f, eventStream, event, queue,
+ ex, elapsed);
+ }
+ // These must happen outside the synchronized block as we
+ // call out to user code
+ queueRef.set(
+ getQueueForInternalBuffering(maxEvents.getAsInt()));
+ long nextWindow = time.get().toNanos();
+ long backpressure = previousWindowSize.getAndSet(nextWindow)
+ - elapsed;
+ scheduler.schedule(
+ getWindowTask(eventStream, f, time, maxEvents, lock,
+ newCount, queueRef, timestamp, counter,
+ previousWindowSize, ex),
+ nextWindow, NANOSECONDS);
+
+ return backpressure < 0 ? CONTINUE
+ : NANOSECONDS.toMillis(backpressure);
+ } else {
+ long elapsed;
+ synchronized (lock) {
+ queue = queueRef.get();
+ queueRef.lazySet(null);
+ endPending.set(true);
+ long now = System.nanoTime();
+ elapsed = now - timestamp.get();
+ counter.lazySet(counter.get() + 1);
+ }
+ Collection<T> collected = queue == null ? emptyList()
+ : queue;
+ ex.execute(() -> {
+ try {
+ eventStream
+ .handleEvent(PushEvent.data(f.apply(
+ Long.valueOf(NANOSECONDS
+ .toMillis(elapsed)),
+ collected)));
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ });
+ }
+ ex.execute(() -> eventStream.handleEvent(event.nodata()));
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ protected Queue<T> getQueueForInternalBuffering(int size) {
+ if (size == 0) {
+ return new LinkedList<T>();
+ } else {
+ return new ArrayQueue<>(size - 1);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * A special queue that keeps one element in reserve and can have that last
+ * element set using forcePush. After the element is set the capacity is
+ * permanently increased by one and cannot grow further.
+ *
+ * @param <E> The element type
+ */
+ private static class ArrayQueue<E> extends AbstractQueue<E>
+ implements Queue<E> {
+
+ final Object[] store;
+
+ int normalLength;
+
+ int nextIndex;
+
+ int size;
+
+ ArrayQueue(int capacity) {
+ store = new Object[capacity + 1];
+ normalLength = store.length - 1;
+ }
+
+ @Override
+ public boolean offer(E e) {
+ if (e == null)
+ throw new NullPointerException("Null values are not supported");
+ if (size < normalLength) {
+ store[nextIndex] = e;
+ size++;
+ nextIndex++;
+ nextIndex = nextIndex % normalLength;
+ return true;
+ }
+ return false;
+ }
+
+ public void forcePush(E e) {
+ store[normalLength] = e;
+ normalLength++;
+ size++;
+ }
+
+ @Override
+ public E poll() {
+ if (size == 0) {
+ return null;
+ } else {
+ int idx = nextIndex - size;
+ if (idx < 0) {
+ idx += normalLength;
+ }
+ E value = (E) store[idx];
+ store[idx] = null;
+ size--;
+ return value;
+ }
+ }
+
+ @Override
+ public E peek() {
+ if (size == 0) {
+ return null;
+ } else {
+ int idx = nextIndex - size;
+ if (idx < 0) {
+ idx += normalLength;
+ }
+ return (E) store[idx];
+ }
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ final int previousNext = nextIndex;
+ return new Iterator<E>() {
+
+ int idx;
+
+ int remaining = size;
+
+ {
+ idx = nextIndex - size;
+ if (idx < 0) {
+ idx += normalLength;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextIndex != previousNext) {
+ throw new ConcurrentModificationException(
+ "The queue was concurrently modified");
+ }
+ return remaining > 0;
+ }
+
+ @Override
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException(
+ "The iterator has no more values");
+ }
+ E value = (E) store[idx];
+ idx++;
+ remaining--;
+ if (idx == normalLength) {
+ idx = 0;
+ }
+ return value;
+ }
+
+ };
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ }
+
+ private <R> Runnable getWindowTask(AbstractPushStreamImpl<R> eventStream,
+ BiFunction<Long,Collection<T>,R> f, Supplier<Duration> time,
+ IntSupplier maxEvents, Object lock, long expectedCounter,
+ AtomicReference<Queue<T>> queueRef, AtomicLong timestamp,
+ AtomicLong counter, AtomicLong previousWindowSize,
+ Executor executor) {
+ return () -> {
+
+ Queue<T> queue = null;
+ long elapsed;
+ synchronized (lock) {
+
+ if (counter.get() != expectedCounter) {
+ return;
+ }
+ counter.lazySet(expectedCounter + 1);
+
+ long now = System.nanoTime();
+ elapsed = now - timestamp.get();
+ timestamp.lazySet(now);
+
+ queue = queueRef.get();
+ queueRef.lazySet(null);
+
+ // This is a non-blocking call, and must happen in the
+ // synchronized block to avoid re=ordering the executor
+ // enqueue with a subsequent incoming close operation
+
+ Collection<T> collected = queue == null ? emptyList() : queue;
+ executor.execute(() -> {
+ try {
+ eventStream.handleEvent(PushEvent.data(f.apply(
+ Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+ collected)));
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ });
+ }
+
+ // These must happen outside the synchronized block as we
+ // call out to user code
+ long nextWindow = time.get().toNanos();
+ previousWindowSize.set(nextWindow);
+ queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+ scheduler.schedule(
+ getWindowTask(eventStream, f, time, maxEvents, lock,
+ expectedCounter + 1, queueRef, timestamp, counter,
+ previousWindowSize, executor),
+ nextWindow, NANOSECONDS);
+ };
+ }
+
+ private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f,
+ AbstractPushStreamImpl<R> eventStream,
+ PushEvent< ? extends T> event, Queue<T> queue, Executor executor,
+ long elapsed) {
+ executor.execute(() -> {
+ try {
+ if (!queue.offer(event.getData())) {
+ ((ArrayQueue<T>) queue).forcePush(event.getData());
+ }
+ long result = eventStream.handleEvent(PushEvent.data(
+ f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+ queue)));
+ if (result < 0) {
+ close();
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ });
+ }
+
+ @Override
+ public Promise<Void> forEach(Consumer< ? super T> action) {
+ Deferred<Void> d = new Deferred<>();
+ updateNext((event) -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ action.accept(event.getData());
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(null);
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ d.fail(e);
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Object[]> toArray() {
+ return collect(Collectors.toList())
+ .map(List::toArray);
+ }
+
+ @Override
+ public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) {
+ return collect(Collectors.toList())
+ .map(l -> l.toArray(generator.apply(l.size())));
+ }
+
+ @Override
+ public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
+ Deferred<T> d = new Deferred<>();
+ AtomicReference<T> iden = new AtomicReference<T>(identity);
+
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ iden.accumulateAndGet(event.getData(), accumulator);
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(iden.get());
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
+ Deferred<Optional<T>> d = new Deferred<>();
+ AtomicReference<T> iden = new AtomicReference<T>(null);
+
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ if (!iden.compareAndSet(null, event.getData()))
+ iden.accumulateAndGet(event.getData(), accumulator);
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(Optional.ofNullable(iden.get()));
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
+ Deferred<U> d = new Deferred<>();
+ AtomicReference<U> iden = new AtomicReference<>(identity);
+
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ iden.updateAndGet((e) -> accumulator.apply(e, event.getData()));
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(iden.get());
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
+ A result = collector.supplier().get();
+ BiConsumer<A, ? super T> accumulator = collector.accumulator();
+ Deferred<R> d = new Deferred<>();
+ PushEventConsumer<T> consumer;
+
+ if (collector.characteristics()
+ .contains(Collector.Characteristics.CONCURRENT)) {
+ consumer = event -> {
+ try {
+ switch (event.getType()) {
+ case DATA :
+ accumulator.accept(result, event.getData());
+ return CONTINUE;
+ case CLOSE :
+ d.resolve(collector.finisher().apply(result));
+ break;
+ case ERROR :
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ };
+ } else {
+ consumer = event -> {
+ try {
+ switch (event.getType()) {
+ case DATA :
+ synchronized (result) {
+ accumulator.accept(result, event.getData());
+ }
+ return CONTINUE;
+ case CLOSE :
+ d.resolve(collector.finisher().apply(result));
+ break;
+ case ERROR :
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ };
+ }
+
+ updateNext(consumer);
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Optional<T>> min(Comparator<? super T> comparator) {
+ return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b);
+ }
+
+ @Override
+ public Promise<Optional<T>> max(Comparator<? super T> comparator) {
+ return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b);
+ }
+
+ @Override
+ public Promise<Long> count() {
+ Deferred<Long> d = new Deferred<>();
+ LongAdder counter = new LongAdder();
+ updateNext((event) -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ counter.add(1);
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(Long.valueOf(counter.sum()));
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Boolean> anyMatch(Predicate<? super T> predicate) {
+ return filter(predicate).findAny()
+ .map(Optional::isPresent);
+ }
+
+ @Override
+ public Promise<Boolean> allMatch(Predicate<? super T> predicate) {
+ return filter(x -> !predicate.test(x)).findAny()
+ .map(o -> Boolean.valueOf(!o.isPresent()));
+ }
+
+ @Override
+ public Promise<Boolean> noneMatch(Predicate<? super T> predicate) {
+ return filter(predicate).findAny()
+ .map(o -> Boolean.valueOf(!o.isPresent()));
+ }
+
+ @Override
+ public Promise<Optional<T>> findFirst() {
+ Deferred<Optional<T>> d = new Deferred<>();
+ updateNext((event) -> {
+ try {
+ Optional<T> o = null;
+ switch(event.getType()) {
+ case DATA:
+ o = Optional.of(event.getData());
+ break;
+ case CLOSE:
+ o = Optional.empty();
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ return ABORT;
+ }
+ if(!d.getPromise().isDone())
+ d.resolve(o);
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Optional<T>> findAny() {
+ return findFirst();
+ }
+
+ @Override
+ public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
+ Deferred<Long> d = new Deferred<>();
+ LongAdder la = new LongAdder();
+ updateNext((event) -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ long value = action.accept(event);
+ la.add(value);
+ return value;
+ case CLOSE:
+ try {
+ action.accept(event);
+ } finally {
+ d.resolve(Long.valueOf(la.sum()));
+ }
+ break;
+ case ERROR:
+ try {
+ action.accept(event);
+ } finally {
+ d.fail(event.getFailure());
+ }
+ break;
+ }
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java
new file mode 100644
index 000000000..2aa6ec763
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java
@@ -0,0 +1,79 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a buffered section of a Push-based stream
+ *
+ * @param <R> The type of object being built
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> {
+
+ /**
+ * The BlockingQueue implementation to use as a buffer
+ *
+ * @param queue
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withBuffer(U queue);
+
+ /**
+ * Set the {@link QueuePolicy} of this Builder
+ *
+ * @param queuePolicy
+ * @return this builder
+ */
+ BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+ /**
+ * Set the {@link QueuePolicy} of this Builder
+ *
+ * @param queuePolicyOption
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
+
+ /**
+ * Set the {@link PushbackPolicy} of this builder
+ *
+ * @param pushbackPolicy
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy);
+
+ /**
+ * Set the {@link PushbackPolicy} of this builder
+ *
+ * @param pushbackPolicyOption
+ * @param time
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time);
+
+ /**
+ * Set the maximum permitted number of concurrent event deliveries allowed
+ * from this buffer
+ *
+ * @param parallelism
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withParallelism(int parallelism);
+
+ /**
+ * Set the {@link Executor} that should be used to deliver events from this
+ * buffer
+ *
+ * @param executor
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withExecutor(Executor executor);
+
+ /**
+ * @return the object being built
+ */
+ R create();
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
new file mode 100644
index 000000000..7cedafb5c
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -0,0 +1,111 @@
+package org.osgi.util.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
+
+ private final U eventQueue;
+
+ private final Semaphore semaphore;
+
+ private final Executor worker;
+
+ private final QueuePolicy<T, U> queuePolicy;
+
+ private final PushbackPolicy<T, U> pushbackPolicy;
+
+ /**
+ * Indicates that a terminal event has been received, that we should stop
+ * collecting new events, and that we must drain the buffer before
+ * continuing
+ */
+ private final AtomicBoolean softClose = new AtomicBoolean();
+
+ private final int parallelism;
+
+ BufferedPushStreamImpl(PushStreamProvider psp,
+ ScheduledExecutorService scheduler, U eventQueue,
+ int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy,
+ PushbackPolicy<T,U> pushbackPolicy,
+ Function<PushEventConsumer<T>,AutoCloseable> connector) {
+ super(psp, worker, scheduler, connector);
+ this.eventQueue = eventQueue;
+ this.parallelism = parallelism;
+ this.semaphore = new Semaphore(parallelism);
+ this.worker = worker;
+ this.queuePolicy = queuePolicy;
+ this.pushbackPolicy = pushbackPolicy;
+ }
+
+ @Override
+ protected long handleEvent(PushEvent< ? extends T> event) {
+
+ // If we have already been soft closed, or hard closed then abort
+ if (!softClose.compareAndSet(false, event.isTerminal())
+ || closed.get() == CLOSED) {
+ return ABORT;
+ }
+
+ try {
+ queuePolicy.doOffer(eventQueue, event);
+ long backPressure = pushbackPolicy.pushback(eventQueue);
+ if(backPressure < 0) {
+ close();
+ return ABORT;
+ }
+ if(semaphore.tryAcquire()) {
+ startWorker();
+ }
+ return backPressure;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ }
+
+ private void startWorker() {
+ worker.execute(() -> {
+ try {
+ PushEvent< ? extends T> event;
+ while ((event = eventQueue.poll()) != null) {
+ if (event.isTerminal()) {
+ // Wait for the other threads to finish
+ semaphore.acquire(parallelism - 1);
+ }
+
+ long backpressure = super.handleEvent(event);
+ if(backpressure < 0) {
+ close();
+ return;
+ } else if(backpressure > 0) {
+ scheduler.schedule(this::startWorker, backpressure,
+ MILLISECONDS);
+ return;
+ }
+ }
+
+ semaphore.release();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ if(eventQueue.peek() != null && semaphore.tryAcquire()) {
+ try {
+ startWorker();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ }
+ });
+
+ }
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
new file mode 100644
index 000000000..3a4da2fd9
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -0,0 +1,35 @@
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
+ implements PushStream<T> {
+
+ private final AbstractPushStreamImpl< ? > previous;
+
+ IntermediatePushStreamImpl(PushStreamProvider psp,
+ Executor executor, ScheduledExecutorService scheduler,
+ AbstractPushStreamImpl< ? > previous) {
+ super(psp, executor, scheduler);
+ this.previous = previous;
+ }
+
+ @Override
+ protected boolean begin() {
+ if(closed.compareAndSet(BUILDING, STARTED)) {
+ beginning();
+ previous.begin();
+ return true;
+ }
+ return false;
+ }
+
+ protected void beginning() {
+ // The base implementation has nothing to do, but
+ // this method is used in windowing
+ }
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java
new file mode 100644
index 000000000..028f0a392
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.PushEvent.EventType.*;
+
+/**
+ * A PushEvent is an immutable object that is transferred through a
+ * communication channel to push information to a downstream consumer. The event
+ * has three different types:
+ * <ul>
+ * <li>{@link EventType#DATA} – Provides access to a typed data element in the
+ * stream.
+ * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this
+ * event, no more events will follow.
+ * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem
+ * and is sending the reason downstream. The stream is closed and no more events
+ * will follow after this event.
+ * </ul>
+ *
+ * @param <T> The payload type of the event.
+ * @Immutable
+ */
+public abstract class PushEvent<T> {
+
+ /**
+ * The type of a {@link PushEvent}.
+ */
+ public static enum EventType {
+ /**
+ * A data event forming part of the stream
+ */
+ DATA,
+ /**
+ * An error event that indicates streaming has failed and that no more
+ * events will arrive
+ */
+ ERROR,
+ /**
+ * An event that indicates that the stream has terminated normally
+ */
+ CLOSE
+ }
+
+ /**
+ * Package private default constructor.
+ */
+ PushEvent() {}
+
+ /**
+ * Get the type of this event.
+ *
+ * @return The type of this event.
+ */
+ public abstract EventType getType();
+
+ /**
+ * Return the data for this event.
+ *
+ * @return The data payload.
+ * @throws IllegalStateException if this event is not a
+ * {@link EventType#DATA} event.
+ */
+ public T getData() throws IllegalStateException {
+ throw new IllegalStateException(
+ "Not a DATA event, the event type is " + getType());
+ }
+
+ /**
+ * Return the error that terminated the stream.
+ *
+ * @return The error that terminated the stream.
+ * @throws IllegalStateException if this event is not an
+ * {@link EventType#ERROR} event.
+ */
+ public Exception getFailure() throws IllegalStateException {
+ throw new IllegalStateException(
+ "Not an ERROR event, the event type is " + getType());
+ }
+
+ /**
+ * Answer if no more events will follow after this event.
+ *
+ * @return {@code false} if this is a data event, otherwise {@code true}.
+ */
+ public boolean isTerminal() {
+ return true;
+ }
+
+ /**
+ * Create a new data event.
+ *
+ * @param <T> The payload type.
+ * @param payload The payload.
+ * @return A new data event wrapping the specified payload.
+ */
+ public static <T> PushEvent<T> data(T payload) {
+ return new DataEvent<T>(payload);
+ }
+
+ /**
+ * Create a new error event.
+ *
+ * @param <T> The payload type.
+ * @param e The error.
+ * @return A new error event with the specified error.
+ */
+ public static <T> PushEvent<T> error(Exception e) {
+ return new ErrorEvent<T>(e);
+ }
+
+ /**
+ * Create a new close event.
+ *
+ * @param <T> The payload type.
+ * @return A new close event.
+ */
+ public static <T> PushEvent<T> close() {
+ return new CloseEvent<T>();
+ }
+
+ /**
+ * Convenience to cast a close/error event to another payload type. Since
+ * the payload type is not needed for these events this is harmless. This
+ * therefore allows you to forward the close/error event downstream without
+ * creating anew event.
+ *
+ * @param <X> The new payload type.
+ * @return The current error or close event mapped to a new payload type.
+ * @throws IllegalStateException if the event is a {@link EventType#DATA}
+ * event.
+ */
+ public <X> PushEvent<X> nodata() throws IllegalStateException {
+ @SuppressWarnings("unchecked")
+ PushEvent<X> result = (PushEvent<X>) this;
+ return result;
+ }
+
+ static final class DataEvent<T> extends PushEvent<T> {
+ private final T data;
+
+ DataEvent(T data) {
+ this.data = data;
+ }
+
+ @Override
+ public T getData() throws IllegalStateException {
+ return data;
+ }
+
+ @Override
+ public EventType getType() {
+ return DATA;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+
+ @Override
+ public <X> PushEvent<X> nodata() throws IllegalStateException {
+ throw new IllegalStateException("This event is a DATA event");
+ }
+ }
+
+ static final class ErrorEvent<T> extends PushEvent<T> {
+ private final Exception error;
+
+ ErrorEvent(Exception error) {
+ this.error = error;
+ }
+
+ @Override
+ public Exception getFailure() {
+ return error;
+ }
+
+ @Override
+ public EventType getType() {
+ return ERROR;
+ }
+ }
+
+ static final class CloseEvent<T> extends PushEvent<T> {
+ @Override
+ public EventType getType() {
+ return CLOSE;
+ }
+ }
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java
new file mode 100644
index 000000000..43de152ae
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An Async Event Consumer asynchronously receives Data events until it receives
+ * either a Close or Error event.
+ *
+ * @param <T>
+ * The type for the event payload
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventConsumer<T> {
+
+ /**
+ * If ABORT is used as return value, the sender should close the channel all
+ * the way to the upstream source. The ABORT will not guarantee that no
+ * more events are delivered since this is impossible in a concurrent
+ * environment. The consumer should accept subsequent events and close/clean
+ * up when the Close or Error event is received.
+ *
+ * Though ABORT has the value -1, any value less than 0 will act as an
+ * abort.
+ */
+ long ABORT = -1;
+
+ /**
+ * A 0 indicates that the consumer is willing to receive subsequent events
+ * at full speeds.
+ *
+ * Any value more than 0 will indicate that the consumer is becoming
+ * overloaded and wants a delay of the given milliseconds before the next
+ * event is sent. This allows the consumer to pushback the event delivery
+ * speed.
+ */
+ long CONTINUE = 0;
+
+ /**
+ * Accept an event from a source. Events can be delivered on multiple
+ * threads simultaneously. However, Close and Error events are the last
+ * events received, no more events must be sent after them.
+ *
+ * @param event The event
+ * @return less than 0 means abort, 0 means continue, more than 0 means
+ * delay ms
+ * @throws Exception to indicate that an error has occured and that no
+ * further events should be delivered to this
+ * {@link PushEventConsumer}
+ */
+ long accept(PushEvent<? extends T> event) throws Exception;
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java
new file mode 100644
index 000000000..d43399d77
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An event source. An event source can open a channel between a source and a
+ * consumer. Once the channel is opened (even before it returns) the source can
+ * send events to the consumer.
+ *
+ * A source should stop sending and automatically close the channel when sending
+ * an event returns a negative value, see {@link PushEventConsumer#ABORT}.
+ * Values that are larger than 0 should be treated as a request to delay the
+ * next events with those number of milliseconds.
+ *
+ * @param <T>
+ * The payload type
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventSource<T> {
+
+ /**
+ * Open the asynchronous channel between the source and the consumer. The
+ * call returns an {@link AutoCloseable}. This can be closed, and should
+ * close the channel, including sending a Close event if the channel was not
+ * already closed. The returned object must be able to be closed multiple
+ * times without sending more than one Close events.
+ *
+ * @param aec the consumer (not null)
+ * @return a {@link AutoCloseable} that can be used to close the stream
+ * @throws Exception
+ */
+ AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception;
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java
new file mode 100644
index 000000000..c26bc8c4d
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java
@@ -0,0 +1,609 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.TimeoutException;
+
+/**
+ * A Push Stream fulfills the same role as the Java 8 stream but it reverses the
+ * control direction. The Java 8 stream is pull based and this is push based. A
+ * Push Stream makes it possible to build a pipeline of transformations using a
+ * builder kind of model. Just like streams, it provides a number of terminating
+ * methods that will actually open the channel and perform the processing until
+ * the channel is closed (The source sends a Close event). The results of the
+ * processing will be send to a Promise, just like any error events. A stream
+ * can be used multiple times. The Push Stream represents a pipeline. Upstream
+ * is in the direction of the source, downstream is in the direction of the
+ * terminating method. Events are sent downstream asynchronously with no
+ * guarantee for ordering or concurrency. Methods are available to provide
+ * serialization of the events and splitting in background threads.
+ *
+ * @param <T> The Payload type
+ */
+@ProviderType
+public interface PushStream<T> extends AutoCloseable {
+
+ /**
+ * Must be run after the channel is closed. This handler will run after the
+ * downstream methods have processed the close event and before the upstream
+ * methods have closed.
+ *
+ * @param closeHandler Will be called on close
+ * @return This stream
+ */
+ PushStream<T> onClose(Runnable closeHandler);
+
+ /**
+ * Must be run after the channel is closed. This handler will run after the
+ * downstream methods have processed the close event and before the upstream
+ * methods have closed.
+ *
+ * @param closeHandler Will be called on close
+ * @return This stream
+ */
+ PushStream<T> onError(Consumer< ? super Throwable> closeHandler);
+
+ /**
+ * Only pass events downstream when the predicate tests true.
+ *
+ * @param predicate The predicate that is tested (not null)
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> filter(Predicate< ? super T> predicate);
+
+ /**
+ * Map a payload value.
+ *
+ * @param mapper The map function
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
+
+ /**
+ * Flat map the payload value (turn one event into 0..n events of
+ * potentially another type).
+ *
+ * @param mapper The flat map function
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> flatMap(
+ Function< ? super T, ? extends PushStream< ? extends R>> mapper);
+
+ /**
+ * Remove any duplicates. Notice that this can be expensive in a large
+ * stream since it must track previous payloads.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> distinct();
+
+ /**
+ * Sorted the elements, assuming that T extends Comparable. This is of
+ * course expensive for large or infinite streams since it requires
+ * buffering the stream until close.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> sorted();
+
+ /**
+ * Sorted the elements with the given comparator. This is of course
+ * expensive for large or infinite streams since it requires buffering the
+ * stream until close.
+ *
+ * @param comparator
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> sorted(Comparator< ? super T> comparator);
+
+ /**
+ * Automatically close the channel after the maxSize number of elements is
+ * received.
+ *
+ * @param maxSize Maximum number of elements has been received
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> limit(long maxSize);
+
+ /**
+ * Automatically close the channel after the given amount of time has
+ * elapsed.
+ *
+ * @param maxTime The maximum time that the stream should remain open
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> limit(Duration maxTime);
+
+ /**
+ * Automatically fail the channel if no events are received for the
+ * indicated length of time. If the timeout is reached then a failure event
+ * containing a {@link TimeoutException} will be sent.
+ *
+ * @param idleTime The length of time that the stream should remain open
+ * when no events are being received.
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> timeout(Duration idleTime);
+
+ /**
+ * Skip a number of events in the channel.
+ *
+ * @param n number of elements to skip
+ * @throws IllegalArgumentException if the number of events to skip is
+ * negative
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> skip(long n);
+
+ /**
+ * Execute the downstream events in up to n background threads. If more
+ * requests are outstanding apply delay * nr of delayed threads back
+ * pressure. A downstream channel that is closed or throws an exception will
+ * cause all execution to cease and the stream to close
+ *
+ * @param n number of simultaneous background threads to use
+ * @param delay Nr of ms/thread that is queued back pressure
+ * @param e an executor to use for the background threads.
+ * @return Builder style (can be a new or the same object)
+ * @throws IllegalArgumentException if the number of threads is < 1 or the
+ * delay is < 0
+ * @throws NullPointerException if the Executor is null
+ */
+ PushStream<T> fork(int n, int delay, Executor e)
+ throws IllegalArgumentException, NullPointerException;
+
+ /**
+ * Buffer the events in a queue using default values for the queue size and
+ * other behaviours. Buffered work will be processed asynchronously in the
+ * rest of the chain. Buffering also blocks the transmission of back
+ * pressure to previous elements in the chain, although back pressure is
+ * honoured by the buffer.
+ * <p>
+ * Buffers are useful for "bursty" event sources which produce a number of
+ * events close together, then none for some time. These bursts can
+ * sometimes overwhelm downstream event consumers. Buffering will not,
+ * however, protect downstream components from a source which produces
+ * events faster than they can be consumed. For fast sources
+ * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+ * {@link #fork(int, int, Executor)} are better choices.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> buffer();
+
+ /**
+ * Build a buffer to enqueue events in a queue using custom values for the
+ * queue size and other behaviours. Buffered work will be processed
+ * asynchronously in the rest of the chain. Buffering also blocks the
+ * transmission of back pressure to previous elements in the chain, although
+ * back pressure is honoured by the buffer.
+ * <p>
+ * Buffers are useful for "bursty" event sources which produce a number of
+ * events close together, then none for some time. These bursts can
+ * sometimes overwhelm downstream event consumers. Buffering will not,
+ * however, protect downstream components from a source which produces
+ * events faster than they can be consumed. For fast sources
+ * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+ * {@link #fork(int, int, Executor)} are better choices.
+ * <p>
+ * Buffers are also useful as "circuit breakers" in the pipeline. If a
+ * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
+ * the stream to close, preventing an event storm from reaching the client.
+ *
+ * @param parallelism
+ * @param executor
+ * @param queue
+ * @param queuePolicy
+ * @param pushbackPolicy
+ * @return Builder style (can be a new or the same object)
+ */
+ <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
+
+ /**
+ * Merge in the events from another source. The resulting channel is not
+ * closed until this channel and the channel from the source are closed.
+ *
+ * @param source The source to merge in.
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> merge(PushEventSource< ? extends T> source);
+
+ /**
+ * Merge in the events from another PushStream. The resulting channel is not
+ * closed until this channel and the channel from the source are closed.
+ *
+ * @param source The source to merge in.
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> merge(PushStream< ? extends T> source);
+
+ /**
+ * Split the events to different streams based on a predicate. If the
+ * predicate is true, the event is dispatched to that channel on the same
+ * position. All predicates are tested for every event.
+ * <p>
+ * This method differs from other methods of AsyncStream in three
+ * significant ways:
+ * <ul>
+ * <li>The return value contains multiple streams.</li>
+ * <li>This stream will only close when all of these child streams have
+ * closed.</li>
+ * <li>Event delivery is made to all open children that accept the event.
+ * </li>
+ * </ul>
+ *
+ * @param predicates the predicates to test
+ * @return streams that map to the predicates
+ */
+ @SuppressWarnings("unchecked")
+ PushStream<T>[] split(Predicate< ? super T>... predicates);
+
+ /**
+ * Ensure that any events are delivered sequentially. That is, no
+ * overlapping calls downstream. This can be used to turn a forked stream
+ * (where for example a heavy conversion is done in multiple threads) back
+ * into a sequential stream so a reduce is simple to do.
+ *
+ * @return Builder style (can be a new or the same object)
+ */
+ PushStream<T> sequential();
+
+ /**
+ * Coalesces a number of events into a new type of event. The input events
+ * are forwarded to a accumulator function. This function returns an
+ * Optional. If the optional is present, it's value is send downstream,
+ * otherwise it is ignored.
+ *
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f);
+
+ /**
+ * Coalesces a number of events into a new type of event. A fixed number of
+ * input events are forwarded to a accumulator function. This function
+ * returns new event data to be forwarded on.
+ *
+ * @param count
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f);
+
+ /**
+ * Coalesces a number of events into a new type of event. A variable number
+ * of input events are forwarded to a accumulator function. The number of
+ * events to be forwarded is determined by calling the count function. The
+ * accumulator function then returns new event data to be forwarded on.
+ *
+ * @param count
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ public <R> PushStream<R> coalesce(IntSupplier count,
+ Function<Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a fixed time interval and then forwards
+ * the events to an accumulator function. This function returns new event
+ * data to be forwarded on. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered as a different task, (and therefore potentially on a different
+ * thread) from the one that delivered the event to this {@link PushStream}.
+ * </li>
+ * <li>Due to the buffering and asynchronous delivery required, this method
+ * prevents the propagation of back-pressure to earlier stages</li>
+ * </ul>
+ *
+ * @param d
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a fixed time interval and then forwards
+ * the events to an accumulator function. This function returns new event
+ * data to be forwarded on. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered by a task given to the supplied executor.</li>
+ * <li>Due to the buffering and asynchronous delivery required, this method
+ * prevents the propagation of back-pressure to earlier stages</li>
+ * </ul>
+ *
+ * @param d
+ * @param executor
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Duration d, Executor executor,
+ Function<Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a variable time interval and then
+ * forwards the events to an accumulator function. The length of time over
+ * which events are buffered is determined by the time function. A maximum
+ * number of events can also be requested, if this number of events is
+ * reached then the accumulator will be called early. The accumulator
+ * function returns new event data to be forwarded on. It is also given the
+ * length of time for which the buffer accumulated data. This may be less
+ * than the requested interval if the buffer reached the maximum number of
+ * requested events early. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered as a different task, (and therefore potentially on a different
+ * thread) from the one that delivered the event to this {@link PushStream}.
+ * </li>
+ * <li>Due to the buffering and asynchronous delivery required, this method
+ * prevents the propagation of back-pressure to earlier stages</li>
+ * <li>If the window finishes by hitting the maximum number of events then
+ * the remaining time in the window will be applied as back-pressure to the
+ * previous stage, attempting to slow the producer to the expected windowing
+ * threshold.</li>
+ * </ul>
+ *
+ * @param timeSupplier
+ * @param maxEvents
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Supplier<Duration> timeSupplier,
+ IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f);
+
+ /**
+ * Buffers a number of events over a variable time interval and then
+ * forwards the events to an accumulator function. The length of time over
+ * which events are buffered is determined by the time function. A maximum
+ * number of events can also be requested, if this number of events is
+ * reached then the accumulator will be called early. The accumulator
+ * function returns new event data to be forwarded on. It is also given the
+ * length of time for which the buffer accumulated data. This may be less
+ * than the requested interval if the buffer reached the maximum number of
+ * requested events early. Note that:
+ * <ul>
+ * <li>The collection forwarded to the accumulator function will be empty if
+ * no events arrived during the time interval.</li>
+ * <li>The accumulator function will be run and the forwarded event
+ * delivered as a different task, (and therefore potentially on a different
+ * thread) from the one that delivered the event to this {@link PushStream}.
+ * </li>
+ * <li>If the window finishes by hitting the maximum number of events then
+ * the remaining time in the window will be applied as back-pressure to the
+ * previous stage, attempting to slow the producer to the expected windowing
+ * threshold.</li>
+ * </ul>
+ *
+ * @param timeSupplier
+ * @param maxEvents
+ * @param executor
+ * @param f
+ * @return Builder style (can be a new or the same object)
+ */
+ <R> PushStream<R> window(Supplier<Duration> timeSupplier,
+ IntSupplier maxEvents, Executor executor,
+ BiFunction<Long,Collection<T>,R> f);
+
+ /**
+ * Execute the action for each event received until the channel is closed.
+ * This is a terminating method, the returned promise is resolved when the
+ * channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param action The action to perform
+ * @return A promise that is resolved when the channel closes.
+ */
+ Promise<Void> forEach(Consumer< ? super T> action);
+
+ /**
+ * Collect the payloads in an Object array after the channel is closed. This
+ * is a terminating method, the returned promise is resolved when the
+ * channel is closed.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @return A promise that is resolved with all the payloads received over
+ * the channel
+ */
+ Promise<Object[]> toArray();
+
+ /**
+ * Collect the payloads in an Object array after the channel is closed. This
+ * is a terminating method, the returned promise is resolved when the
+ * channel is closed. The type of the array is handled by the caller using a
+ * generator function that gets the length of the desired array.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param generator
+ * @return A promise that is resolved with all the payloads received over
+ * the channel
+ */
+ <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator);
+
+ /**
+ * Standard reduce, see Stream. The returned promise will be resolved when
+ * the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param identity The identity/begin value
+ * @param accumulator The accumulator
+ * @return A
+ */
+ Promise<T> reduce(T identity, BinaryOperator<T> accumulator);
+
+ /**
+ * Standard reduce without identity, so the return is an Optional. The
+ * returned promise will be resolved when the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param accumulator The accumulator
+ * @return an Optional
+ */
+ Promise<Optional<T>> reduce(BinaryOperator<T> accumulator);
+
+ /**
+ * Standard reduce with identity, accumulator and combiner. The returned
+ * promise will be resolved when the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param identity
+ * @param accumulator
+ * @param combiner combines to U's into one U (e.g. how combine two lists)
+ * @return The promise
+ */
+ <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator,
+ BinaryOperator<U> combiner);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param collector
+ * @return A Promise representing the collected results
+ */
+ <R, A> Promise<R> collect(Collector< ? super T,A,R> collector);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param comparator
+ * @return A Promise representing the minimum value, or null if no values
+ * are seen before the end of the stream
+ */
+ Promise<Optional<T>> min(Comparator< ? super T> comparator);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param comparator
+ * @return A Promise representing the maximum value, or null if no values
+ * are seen before the end of the stream
+ */
+ Promise<Optional<T>> max(Comparator< ? super T> comparator);
+
+ /**
+ * See Stream. Will resolve onces the channel closes.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @return A Promise representing the number of values in the stream
+ */
+ Promise<Long> count();
+
+ /**
+ * Close the channel and resolve the promise with true when the predicate
+ * matches a payload. If the channel is closed before the predicate matches,
+ * the promise is resolved with false.
+ * <p>
+ * This is a <strong>short circuiting terminal operation</strong>
+ *
+ * @param predicate
+ * @return A Promise that will resolve when an event matches the predicate,
+ * or the end of the stream is reached
+ */
+ Promise<Boolean> anyMatch(Predicate< ? super T> predicate);
+
+ /**
+ * Closes the channel and resolve the promise with false when the predicate
+ * does not matches a pay load.If the channel is closed before, the promise
+ * is resolved with true.
+ * <p>
+ * This is a <strong>short circuiting terminal operation</strong>
+ *
+ * @param predicate
+ * @return A Promise that will resolve when an event fails to match the
+ * predicate, or the end of the stream is reached
+ */
+ Promise<Boolean> allMatch(Predicate< ? super T> predicate);
+
+ /**
+ * Closes the channel and resolve the promise with false when the predicate
+ * matches any pay load. If the channel is closed before, the promise is
+ * resolved with true.
+ * <p>
+ * This is a <strong>short circuiting terminal operation</strong>
+ *
+ * @param predicate
+ * @return A Promise that will resolve when an event matches the predicate,
+ * or the end of the stream is reached
+ */
+ Promise<Boolean> noneMatch(Predicate< ? super T> predicate);
+
+ /**
+ * Close the channel and resolve the promise with the first element. If the
+ * channel is closed before, the Optional will have no value.
+ *
+ * @return a promise
+ */
+ Promise<Optional<T>> findFirst();
+
+ /**
+ * Close the channel and resolve the promise with the first element. If the
+ * channel is closed before, the Optional will have no value.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @return a promise
+ */
+ Promise<Optional<T>> findAny();
+
+ /**
+ * Pass on each event to another consumer until the stream is closed.
+ * <p>
+ * This is a <strong>terminal operation</strong>
+ *
+ * @param action
+ * @return a promise
+ */
+ Promise<Long> forEachEvent(PushEventConsumer< ? super T> action);
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java
new file mode 100644
index 000000000..d59c8d9d3
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java
@@ -0,0 +1,52 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * A Builder for a PushStream. This Builder extends the support of a standard
+ * BufferBuilder by allowing the PushStream to be unbuffered.
+ *
+ *
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ extends BufferBuilder<PushStream<T>,T,U> {
+
+ /**
+ * Tells this {@link PushStreamBuilder} to create an unbuffered stream which
+ * delivers events directly to its consumer using the incoming delivery
+ * thread.
+ *
+ * @return the builder
+ */
+ PushStreamBuilder<T,U> unbuffered();
+
+ /*
+ * Overridden methods to allow the covariant return of a PushStreamBuilder
+ */
+
+ @Override
+ PushStreamBuilder<T,U> withBuffer(U queue);
+
+ @Override
+ PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+ @Override
+ PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
+
+ @Override
+ PushStreamBuilder<T,U> withPushbackPolicy(
+ PushbackPolicy<T,U> pushbackPolicy);
+
+ @Override
+ PushStreamBuilder<T,U> withPushbackPolicy(
+ PushbackPolicyOption pushbackPolicyOption, long time);
+
+ @Override
+ PushStreamBuilder<T,U> withParallelism(int parallelism);
+
+ @Override
+ PushStreamBuilder<T,U> withExecutor(Executor executor);
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
new file mode 100644
index 000000000..5ec7cb336
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
@@ -0,0 +1,88 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ extends AbstractBufferBuilder<PushStream<T>,T,U>
+ implements PushStreamBuilder<T,U> {
+
+ private final PushStreamProvider psp;
+ private final PushEventSource<T> eventSource;
+ private final Executor previousExecutor;
+
+ private boolean unbuffered;
+
+ PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
+ PushEventSource<T> eventSource) {
+ this.psp = psp;
+ this.previousExecutor = defaultExecutor;
+ this.eventSource = eventSource;
+ this.worker = defaultExecutor;
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withBuffer(U queue) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withBuffer(queue);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withQueuePolicy(
+ QueuePolicy<T,U> queuePolicy) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withQueuePolicy(
+ QueuePolicyOption queuePolicyOption) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withQueuePolicy(
+ queuePolicyOption);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withPushbackPolicy(
+ PushbackPolicy<T,U> pushbackPolicy) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+ pushbackPolicy);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withPushbackPolicy(
+ PushbackPolicyOption pushbackPolicyOption, long time) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+ pushbackPolicyOption, time);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withParallelism(int parallelism) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withParallelism(parallelism);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> withExecutor(Executor executor) {
+ unbuffered = false;
+ return (PushStreamBuilder<T,U>) super.withExecutor(executor);
+ }
+
+ @Override
+ public PushStreamBuilder<T,U> unbuffered() {
+ unbuffered = true;
+ return this;
+ }
+
+ @Override
+ public PushStream<T> create() {
+ if (unbuffered) {
+ return psp.createUnbufferedStream(eventSource, previousExecutor);
+ } else {
+ return psp.createStream(eventSource, concurrency, worker, buffer,
+ bufferingPolicy, backPressure);
+ }
+ }
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java
new file mode 100644
index 000000000..be87c6bce
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -0,0 +1,581 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEvent.*;
+import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
+import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * A factory for {@link PushStream} instances, and utility methods for handling
+ * {@link PushEventSource}s and {@link PushEventConsumer}s
+ */
+public final class PushStreamProvider {
+
+ private final Lock lock = new ReentrantLock(true);
+
+ private int schedulerReferences;
+
+ private ScheduledExecutorService scheduler;
+
+ private ScheduledExecutorService acquireScheduler() {
+ try {
+ lock.lockInterruptibly();
+ try {
+ schedulerReferences += 1;
+
+ if (schedulerReferences == 1) {
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+ return scheduler;
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Unable to acquire the Scheduler",
+ e);
+ }
+ }
+
+ private void releaseScheduler() {
+ try {
+ lock.lockInterruptibly();
+ try {
+ schedulerReferences -= 1;
+
+ if (schedulerReferences == 0) {
+ scheduler.shutdown();
+ scheduler = null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Create a stream with the default configured buffer, executor size, queue,
+ * queue policy and pushback policy. This is equivalent to calling
+ *
+ * <code>
+ * buildStream(source).create();
+ * </code>
+ *
+ * <p>
+ * This stream will be buffered from the event producer, and will honour
+ * back pressure even if the source does not.
+ *
+ * <p>
+ * Buffered streams are useful for "bursty" event sources which produce a
+ * number of events close together, then none for some time. These bursts
+ * can sometimes overwhelm downstream processors. Buffering will not,
+ * however, protect downstream components from a source which produces
+ * events faster (on average) than they can be consumed.
+ *
+ * <p>
+ * Event delivery will not begin until a terminal operation is reached on
+ * the chain of AsyncStreams. Once a terminal operation is reached the
+ * stream will be connected to the event source.
+ *
+ * @param eventSource
+ * @return A {@link PushStream} with a default initial buffer
+ */
+ public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
+ return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32),
+ FAIL.getPolicy(), LINEAR.getPolicy(1000));
+ }
+
+ /**
+ * Builds a push stream with custom configuration.
+ *
+ * <p>
+ *
+ * The resulting {@link PushStream} may be buffered or unbuffered depending
+ * on how it is configured.
+ *
+ * @param eventSource The source of the events
+ *
+ * @return A {@link PushStreamBuilder} for the stream
+ */
+ public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream(
+ PushEventSource<T> eventSource) {
+ return new PushStreamBuilderImpl<T,U>(this, null, eventSource);
+ }
+
+ @SuppressWarnings({
+ "rawtypes", "unchecked"
+ })
+ <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream(
+ PushEventSource<T> eventSource, int parallelism, Executor executor,
+ U queue, QueuePolicy<T,U> queuePolicy,
+ PushbackPolicy<T,U> pushbackPolicy) {
+
+ if (eventSource == null) {
+ throw new NullPointerException("There is no source of events");
+ }
+
+ if (parallelism < 0) {
+ throw new IllegalArgumentException(
+ "The supplied parallelism cannot be less than zero. It was "
+ + parallelism);
+ } else if (parallelism == 0) {
+ parallelism = 1;
+ }
+
+ boolean closeExecutorOnClose;
+ Executor toUse;
+ if (executor == null) {
+ toUse = Executors.newFixedThreadPool(parallelism);
+ closeExecutorOnClose = true;
+ } else {
+ toUse = executor;
+ closeExecutorOnClose = false;
+ }
+
+ if (queue == null) {
+ queue = (U) new ArrayBlockingQueue(32);
+ }
+
+ if (queuePolicy == null) {
+ queuePolicy = FAIL.getPolicy();
+ }
+
+ if (pushbackPolicy == null) {
+ pushbackPolicy = LINEAR.getPolicy(1000);
+ }
+
+ @SuppressWarnings("resource")
+ PushStream<T> stream = new BufferedPushStreamImpl<>(this,
+ acquireScheduler(), queue, parallelism, toUse, queuePolicy,
+ pushbackPolicy, aec -> {
+ try {
+ return eventSource.open(aec);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to connect to event source", e);
+ }
+ });
+
+ stream = stream.onClose(() -> {
+ if (closeExecutorOnClose) {
+ ((ExecutorService) toUse).shutdown();
+ }
+ releaseScheduler();
+ }).map(Function.identity());
+ return stream;
+ }
+
+ <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
+ Executor executor) {
+
+ boolean closeExecutorOnClose;
+ Executor toUse;
+ if (executor == null) {
+ toUse = Executors.newFixedThreadPool(2);
+ closeExecutorOnClose = true;
+ } else {
+ toUse = executor;
+ closeExecutorOnClose = false;
+ }
+
+ @SuppressWarnings("resource")
+ PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse,
+ acquireScheduler(), aec -> {
+ try {
+ return eventSource.open(aec);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to connect to event source", e);
+ }
+ });
+
+ stream = stream.onClose(() -> {
+ if (closeExecutorOnClose) {
+ ((ExecutorService) toUse).shutdown();
+ }
+ releaseScheduler();
+ }).map(Function.identity());
+
+ return stream;
+ }
+
+ /**
+ * Convert an {@link PushStream} into an {@link PushEventSource}. The first
+ * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
+ * processing.
+ *
+ * The {@link PushEventSource} will remain active until the backing stream
+ * is closed, and permits multiple consumers to
+ * {@link PushEventSource#open(PushEventConsumer)} it.
+ *
+ * This is equivalent to: <code>
+ * buildEventSourceFromStream(stream).create();
+ * </code>
+ *
+ * @param stream
+ * @return a {@link PushEventSource} backed by the {@link PushStream}
+ */
+ public <T> PushEventSource<T> createEventSourceFromStream(
+ PushStream<T> stream) {
+ return buildEventSourceFromStream(stream).create();
+ }
+
+ /**
+ * Convert an {@link PushStream} into an {@link PushEventSource}. The first
+ * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
+ * processing.
+ *
+ * The {@link PushEventSource} will remain active until the backing stream
+ * is closed, and permits multiple consumers to
+ * {@link PushEventSource#open(PushEventConsumer)} it.
+ *
+ * @param stream
+ *
+ * @return a {@link PushEventSource} backed by the {@link PushStream}
+ */
+ public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
+ PushStream<T> stream) {
+ return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
+ @Override
+ public PushEventSource<T> create() {
+ SimplePushEventSource<T> spes = createSimplePushEventSource(
+ concurrency, worker, buffer, bufferingPolicy, () -> {
+ try {
+ stream.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ });
+ spes.connectPromise()
+ .then(p -> stream.forEach(t -> spes.publish(t))
+ .onResolve(() -> spes.close()));
+ return spes;
+ }
+ };
+ }
+
+
+ /**
+ * Create a {@link SimplePushEventSource} with the supplied type and default
+ * buffering behaviours. The SimplePushEventSource will respond to back
+ * pressure requests from the consumers connected to it.
+ *
+ * This is equivalent to: <code>
+ * buildSimpleEventSource(type).create();
+ * </code>
+ *
+ * @param type
+ * @return a {@link SimplePushEventSource}
+ */
+ public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) {
+ return createSimplePushEventSource(1, null,
+ new ArrayBlockingQueue<>(32),
+ FAIL.getPolicy(), () -> { /* Nothing else to do */ });
+ }
+
+ /**
+ *
+ * Build a {@link SimplePushEventSource} with the supplied type and custom
+ * buffering behaviours. The SimplePushEventSource will respond to back
+ * pressure requests from the consumers connected to it.
+ *
+ * @param type
+ *
+ * @return a {@link SimplePushEventSource}
+ */
+
+ public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource(
+ Class<T> type) {
+ return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
+ @Override
+ public SimplePushEventSource<T> create() {
+ return createSimplePushEventSource(concurrency, worker, buffer,
+ bufferingPolicy, () -> { /* Nothing else to do */ });
+ }
+ };
+ }
+
+ @SuppressWarnings({
+ "unchecked", "rawtypes"
+ })
+ <T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource(
+ int parallelism, Executor executor, U queue,
+ QueuePolicy<T,U> queuePolicy, Runnable onClose) {
+
+ if (parallelism < 0) {
+ throw new IllegalArgumentException(
+ "The supplied parallelism cannot be less than zero. It was "
+ + parallelism);
+ } else if (parallelism == 0) {
+ parallelism = 1;
+ }
+
+ boolean closeExecutorOnClose;
+ Executor toUse;
+ if (executor == null) {
+ toUse = Executors.newFixedThreadPool(2);
+ closeExecutorOnClose = true;
+ } else {
+ toUse = executor;
+ closeExecutorOnClose = false;
+ }
+
+ if (queue == null) {
+ queue = (U) new ArrayBlockingQueue(32);
+ }
+
+ if (queuePolicy == null) {
+ queuePolicy = FAIL.getPolicy();
+ }
+
+ SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
+ toUse, acquireScheduler(), queuePolicy, queue, parallelism,
+ () -> {
+ try {
+ onClose.run();
+ } catch (Exception e) {
+ // TODO log this?
+ }
+ if (closeExecutorOnClose) {
+ ((ExecutorService) toUse).shutdown();
+ }
+ releaseScheduler();
+ });
+ return spes;
+ }
+
+ /**
+ * Create a buffered {@link PushEventConsumer} with the default configured
+ * buffer, executor size, queue, queue policy and pushback policy. This is
+ * equivalent to calling
+ *
+ * <code>
+ * buildBufferedConsumer(delegate).create();
+ * </code>
+ *
+ * <p>
+ * The returned consumer will be buffered from the event source, and will
+ * honour back pressure requests from its delegate even if the event source
+ * does not.
+ *
+ * <p>
+ * Buffered consumers are useful for "bursty" event sources which produce a
+ * number of events close together, then none for some time. These bursts
+ * can sometimes overwhelm the consumer. Buffering will not, however,
+ * protect downstream components from a source which produces events faster
+ * than they can be consumed.
+ *
+ * @param delegate
+ * @return a {@link PushEventConsumer} with a buffer directly before it
+ */
+ public <T> PushEventConsumer<T> createBufferedConsumer(
+ PushEventConsumer<T> delegate) {
+ return buildBufferedConsumer(delegate).create();
+ }
+
+ /**
+ * Build a buffered {@link PushEventConsumer} with custom configuration.
+ * <p>
+ * The returned consumer will be buffered from the event source, and will
+ * honour back pressure requests from its delegate even if the event source
+ * does not.
+ * <p>
+ * Buffered consumers are useful for "bursty" event sources which produce a
+ * number of events close together, then none for some time. These bursts
+ * can sometimes overwhelm the consumer. Buffering will not, however,
+ * protect downstream components from a source which produces events faster
+ * than they can be consumed.
+ * <p>
+ * Buffers are also useful as "circuit breakers". If a
+ * {@link QueuePolicyOption#FAIL} is used then a full buffer will request
+ * that the stream close, preventing an event storm from reaching the
+ * client.
+ * <p>
+ * Note that this buffered consumer will close when it receives a terminal
+ * event, or if the delegate returns negative backpressure. No further
+ * events will be propagated after this time.
+ *
+ * @param delegate
+ * @return a {@link PushEventConsumer} with a buffer directly before it
+ */
+ public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer(
+ PushEventConsumer<T> delegate) {
+ return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
+ @Override
+ public PushEventConsumer<T> create() {
+ PushEventPipe<T> pipe = new PushEventPipe<>();
+
+ createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
+ .forEachEvent(delegate);
+
+ return pipe;
+ }
+ };
+ }
+
+ static final class PushEventPipe<T>
+ implements PushEventConsumer<T>, PushEventSource<T> {
+
+ volatile PushEventConsumer< ? super T> delegate;
+
+ @Override
+ public AutoCloseable open(PushEventConsumer< ? super T> pec)
+ throws Exception {
+ return () -> { /* Nothing else to do */ };
+ }
+
+ @Override
+ public long accept(PushEvent< ? extends T> event) throws Exception {
+ return delegate.accept(event);
+ }
+
+ }
+
+ /**
+ * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
+ * data from the stream will be pushed into the PushStream synchronously as
+ * it is opened. This may make terminal operations blocking unless a buffer
+ * has been added to the {@link PushStream}. Care should be taken with
+ * infinite {@link Stream}s to avoid blocking indefinitely.
+ *
+ * @param items The items to push into the PushStream
+ * @return A PushStream containing the items from the Java Stream
+ */
+ public <T> PushStream<T> streamOf(Stream<T> items) {
+ PushEventSource<T> pes = aec -> {
+ AtomicBoolean closed = new AtomicBoolean(false);
+
+ items.mapToLong(i -> {
+ try {
+ long returnValue = closed.get() ? -1 : aec.accept(data(i));
+ if (returnValue < 0) {
+ aec.accept(PushEvent.<T> close());
+ }
+ return returnValue;
+ } catch (Exception e) {
+ try {
+ aec.accept(PushEvent.<T> error(e));
+ } catch (Exception e2) {/* No further events needed */}
+ return -1;
+ }
+ }).filter(i -> i < 0).findFirst().orElseGet(() -> {
+ try {
+ return aec.accept(PushEvent.<T> close());
+ } catch (Exception e) {
+ return -1;
+ }
+ });
+
+ return () -> closed.set(true);
+ };
+
+ return this.<T> createUnbufferedStream(pes, null);
+ }
+
+ /**
+ * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
+ * data from the stream will be pushed into the PushStream asynchronously
+ * using the supplied Executor.
+ *
+ * @param executor The worker to use to push items from the Stream into the
+ * PushStream
+ * @param items The items to push into the PushStream
+ * @return A PushStream containing the items from the Java Stream
+ */
+ public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) {
+
+ boolean closeExecutorOnClose;
+ Executor toUse;
+ if (executor == null) {
+ toUse = Executors.newFixedThreadPool(2);
+ closeExecutorOnClose = true;
+ } else {
+ toUse = executor;
+ closeExecutorOnClose = false;
+ }
+
+ @SuppressWarnings("resource")
+ PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
+ this, toUse, acquireScheduler(), aec -> {
+ return () -> { /* No action to take */ };
+ }) {
+
+ @Override
+ protected boolean begin() {
+ if (super.begin()) {
+ Iterator<T> it = items.iterator();
+
+ toUse.execute(() -> pushData(it));
+
+ return true;
+ }
+ return false;
+ }
+
+ private void pushData(Iterator<T> it) {
+ while (it.hasNext()) {
+ try {
+ long returnValue = closed.get() == CLOSED ? -1
+ : handleEvent(data(it.next()));
+ if (returnValue != 0) {
+ if (returnValue < 0) {
+ close();
+ return;
+ } else {
+ scheduler.schedule(
+ () -> toUse.execute(() -> pushData(it)),
+ returnValue, MILLISECONDS);
+ return;
+ }
+ }
+ } catch (Exception e) {
+ close(error(e));
+ }
+ }
+ close();
+ }
+ };
+
+ stream = stream.onClose(() -> {
+ if (closeExecutorOnClose) {
+ ((ExecutorService) toUse).shutdown();
+ }
+ releaseScheduler();
+ }).map(Function.identity());
+
+ return stream;
+ }
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java
new file mode 100644
index 000000000..4f7f1864f
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * A {@link PushbackPolicy} is used to calculate how much back pressure to apply
+ * based on the current buffer. The {@link PushbackPolicy} will be called after
+ * an event has been queued, and the returned value will be used as back
+ * pressure.
+ *
+ * @see PushbackPolicyOption
+ *
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> {
+
+ /**
+ * Given the current state of the queue, determine the level of back
+ * pressure that should be applied
+ *
+ * @param queue
+ * @return a back pressure value in nanoseconds
+ * @throws Exception
+ */
+ public long pushback(U queue) throws Exception;
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java
new file mode 100644
index 000000000..ecd0e3ea3
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link PushbackPolicyOption} provides a standard set of simple
+ * {@link PushbackPolicy} implementations.
+ *
+ * @see PushbackPolicy
+ */
+public enum PushbackPolicyOption {
+
+ /**
+ * Returns a fixed amount of back pressure, independent of how full the
+ * buffer is
+ */
+ FIXED {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+ return q -> value;
+ }
+ },
+ /**
+ * Returns zero back pressure until the buffer is full, then it returns a
+ * fixed value
+ */
+ ON_FULL_FIXED {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+ return q -> q.remainingCapacity() == 0 ? value : 0;
+ }
+ },
+ /**
+ * Returns zero back pressure until the buffer is full, then it returns an
+ * exponentially increasing amount, starting with the supplied value and
+ * doubling it each time. Once the buffer is no longer full the back
+ * pressure returns to zero.
+ */
+ ON_FULL_EXPONENTIAL {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+ AtomicInteger backoffCount = new AtomicInteger(0);
+ return q -> {
+ if (q.remainingCapacity() == 0) {
+ return value << backoffCount.getAndIncrement();
+ }
+ backoffCount.set(0);
+ return 0;
+ };
+
+ }
+ },
+ /**
+ * Returns zero back pressure when the buffer is empty, then it returns a
+ * linearly increasing amount of back pressure based on how full the buffer
+ * is. The maximum value will be returned when the buffer is full.
+ */
+ LINEAR {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+ return q -> {
+ long remainingCapacity = q.remainingCapacity();
+ long used = q.size();
+ return (value * used) / (used + remainingCapacity);
+ };
+ }
+ };
+
+ /**
+ * Create a {@link PushbackPolicy} instance configured with a base back
+ * pressure time in nanoseconds
+ *
+ * The actual backpressure returned will vary based on the selected
+ * implementation, the base value, and the state of the buffer.
+ *
+ * @param value
+ * @return A {@link PushbackPolicy} to use
+ */
+ public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value);
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java
new file mode 100644
index 000000000..cba94b16c
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.osgi.annotation.versioning.ConsumerType;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+/**
+ * A {@link QueuePolicy} is used to control how events should be queued in the
+ * current buffer. The {@link QueuePolicy} will be called when an event has
+ * arrived.
+ *
+ * @see QueuePolicyOption
+ *
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+
+@ConsumerType
+@FunctionalInterface
+public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> {
+
+ /**
+ * Enqueue the event and return the remaining capacity available for events
+ *
+ * @param queue
+ * @param event
+ * @throws Exception If an error ocurred adding the event to the queue. This
+ * exception will cause the connection between the
+ * {@link PushEventSource} and the {@link PushEventConsumer} to be
+ * closed with an {@link EventType#ERROR}
+ */
+ public void doOffer(U queue, PushEvent<? extends T> event) throws Exception;
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java
new file mode 100644
index 000000000..35df890ee
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * {@link QueuePolicyOption} provides a standard set of simple
+ * {@link QueuePolicy} implementations.
+ *
+ * @see QueuePolicy
+ */
+public enum QueuePolicyOption {
+ /**
+ * Attempt to add the supplied event to the queue. If the queue is unable to
+ * immediately accept the value then discard the value at the head of the
+ * queue and try again. Repeat this process until the event is enqueued.
+ */
+ DISCARD_OLDEST {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+ return (queue, event) -> {
+ while (!queue.offer(event)) {
+ queue.poll();
+ }
+ };
+ }
+ },
+ /**
+ * Attempt to add the supplied event to the queue, blocking until the
+ * enqueue is successful.
+ */
+ BLOCK {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+ return (queue, event) -> {
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ };
+ }
+ },
+ /**
+ * Attempt to add the supplied event to the queue, throwing an exception if
+ * the queue is full.
+ */
+ FAIL {
+ @Override
+ public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+ return (queue, event) -> queue.add(event);
+ }
+ };
+
+ /**
+ * @return a {@link QueuePolicy} implementation
+ */
+ public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy();
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java
new file mode 100644
index 000000000..747b4530d
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+
+/**
+ * A {@link SimplePushEventSource} is a helper that makes it simpler to write a
+ * {@link PushEventSource}. Users do not need to manage multiple registrations
+ * to the stream, nor do they have to be concerned with back pressure.
+ *
+ * @param <T> The type of the events produced by this source
+ */
+@ProviderType
+public interface SimplePushEventSource<T>
+ extends PushEventSource<T>, AutoCloseable {
+ /**
+ * Close this source. Calling this method indicates that there will never be
+ * any more events published by it. Calling this method sends a close event
+ * to all connected consumers. After calling this method any
+ * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)}
+ * this source will immediately receive a close event.
+ */
+ @Override
+ void close();
+
+ /**
+ * Asynchronously publish an event to this stream and all connected
+ * {@link PushEventConsumer} instances. When this method returns there is no
+ * guarantee that all consumers have been notified. Events published by a
+ * single thread will maintain their relative ordering, however they may be
+ * interleaved with events from other threads.
+ *
+ * @param t
+ * @throws IllegalStateException if the source is closed
+ */
+ void publish(T t);
+
+ /**
+ * Close this source for now, but potentially reopen it later. Calling this
+ * method asynchronously sends a close event to all connected consumers.
+ * After calling this method any {@link PushEventConsumer} that wishes may
+ * {@link #open(PushEventConsumer)} this source, and will receive subsequent
+ * events.
+ */
+ void endOfStream();
+
+ /**
+ * Close this source for now, but potentially reopen it later. Calling this
+ * method asynchronously sends an error event to all connected consumers.
+ * After calling this method any {@link PushEventConsumer} that wishes may
+ * {@link #open(PushEventConsumer)} this source, and will receive subsequent
+ * events.
+ *
+ * @param e the error
+ */
+ void error(Exception e);
+
+ /**
+ * Determine whether there are any {@link PushEventConsumer}s for this
+ * {@link PushEventSource}. This can be used to skip expensive event
+ * creation logic when there are no listeners.
+ *
+ * @return true if any consumers are currently connected
+ */
+ boolean isConnected();
+
+ /**
+ * This method can be used to delay event generation until an event source
+ * has connected. The returned promise will resolve as soon as one or more
+ * {@link PushEventConsumer} instances have opened the
+ * SimplePushEventSource.
+ * <p>
+ * The returned promise may already be resolved if this
+ * {@link SimplePushEventSource} already has connected consumers. If the
+ * {@link SimplePushEventSource} is closed before the returned Promise
+ * resolves then it will be failed with an {@link IllegalStateException}.
+ * <p>
+ * Note that the connected consumers are able to asynchronously close their
+ * connections to this {@link SimplePushEventSource}, and therefore it is
+ * possible that once the promise resolves this
+ * {@link SimplePushEventSource} may no longer be connected to any
+ * consumers.
+ *
+ * @return A promise representing the connection state of this EventSource
+ */
+ Promise<Void> connectPromise();
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
new file mode 100644
index 000000000..e31c9bf59
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -0,0 +1,337 @@
+package org.osgi.util.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.Promises;
+
+class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ implements SimplePushEventSource<T> {
+
+ private final Object lock = new Object();
+
+ private final Executor worker;
+
+ private final ScheduledExecutorService scheduler;
+
+ private final QueuePolicy<T,U> queuePolicy;
+
+ private final U queue;
+
+ private final int parallelism;
+
+ private final Semaphore semaphore;
+
+ private final List<PushEventConsumer< ? super T>> connected = new ArrayList<>();
+
+ private final Runnable onClose;
+
+ private boolean closed;
+
+ private Deferred<Void> connectPromise;
+
+ private boolean waitForFinishes;
+
+
+ public SimplePushEventSourceImpl(Executor worker,
+ ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy,
+ U queue, int parallelism, Runnable onClose) {
+ this.worker = worker;
+ this.scheduler = scheduler;
+ this.queuePolicy = queuePolicy;
+ this.queue = queue;
+ this.parallelism = parallelism;
+ this.semaphore = new Semaphore(parallelism);
+ this.onClose = onClose;
+ this.closed = false;
+ this.connectPromise = null;
+ }
+
+ @Override
+ public AutoCloseable open(PushEventConsumer< ? super T> pec)
+ throws Exception {
+ Deferred<Void> toResolve = null;
+ synchronized (lock) {
+ if (closed) {
+ throw new IllegalStateException(
+ "This PushEventConsumer is closed");
+ }
+
+ toResolve = connectPromise;
+ connectPromise = null;
+
+ connected.add(pec);
+ }
+
+ if (toResolve != null) {
+ toResolve.resolve(null);
+ }
+
+ return () -> {
+ closeConsumer(pec, PushEvent.close());
+ };
+ }
+
+ private void closeConsumer(PushEventConsumer< ? super T> pec,
+ PushEvent<T> event) {
+ boolean sendClose;
+ synchronized (lock) {
+ sendClose = connected.remove(pec);
+ }
+ if (sendClose) {
+ doSend(pec, event);
+ }
+ }
+
+ private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+ try {
+ worker.execute(() -> safePush(pec, event));
+ } catch (RejectedExecutionException ree) {
+ // TODO log?
+ if (!event.isTerminal()) {
+ close(PushEvent.error(ree));
+ } else {
+ safePush(pec, event);
+ }
+ }
+ }
+
+ @SuppressWarnings("boxing")
+ private Promise<Long> doSendWithBackPressure(
+ PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+ Deferred<Long> d = new Deferred<>();
+ try {
+ worker.execute(
+ () -> d.resolve(System.nanoTime() + safePush(pec, event)));
+ } catch (RejectedExecutionException ree) {
+ // TODO log?
+ if (!event.isTerminal()) {
+ close(PushEvent.error(ree));
+ return Promises.resolved(System.nanoTime());
+ } else {
+ return Promises
+ .resolved(System.nanoTime() + safePush(pec, event));
+ }
+ }
+ return d.getPromise();
+ }
+
+ private long safePush(PushEventConsumer< ? super T> pec,
+ PushEvent<T> event) {
+ try {
+ long backpressure = pec.accept(event) * 1000000;
+ if (backpressure < 0 && !event.isTerminal()) {
+ closeConsumer(pec, PushEvent.close());
+ return -1;
+ }
+ return backpressure;
+ } catch (Exception e) {
+ // TODO log?
+ if (!event.isTerminal()) {
+ closeConsumer(pec, PushEvent.error(e));
+ }
+ return -1;
+ }
+ }
+
+ @Override
+ public void close() {
+ close(PushEvent.close());
+ }
+
+ private void close(PushEvent<T> event) {
+ List<PushEventConsumer< ? super T>> toClose;
+ Deferred<Void> toFail = null;
+ synchronized (lock) {
+ if(!closed) {
+ closed = true;
+
+ toClose = new ArrayList<>(connected);
+ connected.clear();
+ queue.clear();
+
+ if(connectPromise != null) {
+ toFail = connectPromise;
+ connectPromise = null;
+ }
+ } else {
+ toClose = emptyList();
+ }
+ }
+
+ toClose.stream().forEach(pec -> doSend(pec, event));
+
+ if (toFail != null) {
+ toFail.resolveWith(closedConnectPromise());
+ }
+
+ onClose.run();
+ }
+
+ @Override
+ public void publish(T t) {
+ enqueueEvent(PushEvent.data(t));
+ }
+
+ @Override
+ public void endOfStream() {
+ enqueueEvent(PushEvent.close());
+ }
+
+ @Override
+ public void error(Exception e) {
+ enqueueEvent(PushEvent.error(e));
+ }
+
+ private void enqueueEvent(PushEvent<T> event) {
+ synchronized (lock) {
+ if (closed || connected.isEmpty()) {
+ return;
+ }
+ }
+
+ try {
+ queuePolicy.doOffer(queue, event);
+ boolean start;
+ synchronized (lock) {
+ start = !waitForFinishes && semaphore.tryAcquire();
+ }
+ if (start) {
+ startWorker();
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ throw new IllegalStateException(
+ "The queue policy threw an exception", e);
+ }
+ }
+
+ @SuppressWarnings({
+ "unchecked", "boxing"
+ })
+ private void startWorker() {
+ worker.execute(() -> {
+ try {
+
+ for(;;) {
+ PushEvent<T> event;
+ List<PushEventConsumer< ? super T>> toCall;
+ boolean resetWait = false;
+ synchronized (lock) {
+ if(waitForFinishes) {
+ semaphore.release();
+ while(waitForFinishes) {
+ lock.notifyAll();
+ lock.wait();
+ }
+ semaphore.acquire();
+ }
+
+ event = (PushEvent<T>) queue.poll();
+
+ if(event == null) {
+ break;
+ }
+
+ toCall = new ArrayList<>(connected);
+ if (event.isTerminal()) {
+ waitForFinishes = true;
+ resetWait = true;
+ connected.clear();
+ while (!semaphore.tryAcquire(parallelism - 1)) {
+ lock.wait();
+ }
+ }
+ }
+
+ List<Promise<Long>> calls = toCall.stream().map(pec -> {
+ if (semaphore.tryAcquire()) {
+ try {
+ return doSendWithBackPressure(pec, event);
+ } finally {
+ semaphore.release();
+ }
+ } else {
+ return Promises.resolved(
+ System.nanoTime() + safePush(pec, event));
+ }
+ }).collect(toList());
+
+ long toWait = Promises.all(calls)
+ .map(l -> l.stream()
+ .max(Long::compareTo)
+ .orElseGet(() -> System.nanoTime()))
+ .getValue() - System.nanoTime();
+
+
+ if (toWait > 0) {
+ scheduler.schedule(this::startWorker, toWait,
+ NANOSECONDS);
+ return;
+ }
+
+ if (resetWait == true) {
+ synchronized (lock) {
+ waitForFinishes = false;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ semaphore.release();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ if (queue.peek() != null && semaphore.tryAcquire()) {
+ try {
+ startWorker();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ }
+ });
+
+ }
+
+ @Override
+ public boolean isConnected() {
+ synchronized (lock) {
+ return !connected.isEmpty();
+ }
+ }
+
+ @Override
+ public Promise<Void> connectPromise() {
+ synchronized (lock) {
+ if (closed) {
+ return closedConnectPromise();
+ }
+
+ if (connected.isEmpty()) {
+ if (connectPromise == null) {
+ connectPromise = new Deferred<>();
+ }
+ return connectPromise.getPromise();
+ } else {
+ return Promises.resolved(null);
+ }
+ }
+ }
+
+ private Promise<Void> closedConnectPromise() {
+ return Promises.failed(new IllegalStateException(
+ "This SimplePushEventSource is closed"));
+ }
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
new file mode 100644
index 000000000..faf9e6584
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -0,0 +1,73 @@
+package org.osgi.util.pushstream;
+
+import static java.util.Optional.ofNullable;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+ extends AbstractPushStreamImpl<T> implements PushStream<T> {
+
+ protected final Function<PushEventConsumer<T>,AutoCloseable> connector;
+
+ protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>();
+
+ UnbufferedPushStreamImpl(PushStreamProvider psp,
+ Executor executor, ScheduledExecutorService scheduler,
+ Function<PushEventConsumer<T>,AutoCloseable> connector) {
+ super(psp, executor, scheduler);
+ this.connector = connector;
+ }
+
+ @Override
+ protected boolean close(PushEvent<T> event) {
+ if(super.close(event)) {
+ ofNullable(upstream.getAndSet(() -> {
+ // This block doesn't need to do anything, but the presence
+ // of the Closable is needed to prevent duplicate begins
+ })).ifPresent(c -> {
+ try {
+ c.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ });
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean begin() {
+ if(closed.compareAndSet(BUILDING, STARTED)) {
+ AutoCloseable toClose = connector.apply(this::handleEvent);
+ if(!upstream.compareAndSet(null,toClose)) {
+ //TODO log that we tried to connect twice...
+ try {
+ toClose.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ if (closed.get() == CLOSED
+ && upstream.compareAndSet(toClose, null)) {
+ // We closed before setting the upstream - close it now
+ try {
+ toClose.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java
new file mode 100644
index 000000000..6a28fa0b5
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Push Stream Package Version 1.0.
+ *
+ * <p>
+ * Bundles wishing to use this package must list the package in the
+ * Import-Package header of the bundle's manifest.
+ *
+ * <p>
+ * Example import for consumers using the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"}
+ * <p>
+ * Example import for providers implementing the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"}
+ *
+ * @author $Id$
+ */
+
+@Version("1.0")
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.Version;

Back to the top