diff options
author | Thomas Watson | 2017-04-28 18:31:25 +0000 |
---|---|---|
committer | Thomas Watson | 2017-06-16 12:38:08 +0000 |
commit | b0b632a4b575f3d996560ae5915370025d18234d (patch) | |
tree | 2bf99e7fa8146b5247356a195ac7510ec92c8ab9 | |
parent | 9c940504036225a9c7d3f869f6d1ca0f55083e50 (diff) | |
download | rt.equinox.framework-b0b632a4b575f3d996560ae5915370025d18234d.tar.gz rt.equinox.framework-b0b632a4b575f3d996560ae5915370025d18234d.tar.xz rt.equinox.framework-b0b632a4b575f3d996560ae5915370025d18234d.zip |
Add pushstream API and fix import ranges
Change-Id: I2bd80833a975650125ee2bf69900d2f99de4f0ce
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
21 files changed, 4252 insertions, 5 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF index 3568239b1..74f3a73cc 100644 --- a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF +++ b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF @@ -5,9 +5,10 @@ Bundle-SymbolicName: org.eclipse.equinox.log.stream Bundle-Version: 1.0.0.qualifier Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamFactoryImpl Bundle-RequiredExecutionEnvironment: JavaSE-1.8 -Import-Package: org.osgi.framework;version="1.3.0", - org.osgi.service.log;version="1.4.0", - org.osgi.util.promise;version="1.0.0", - org.osgi.util.pushstream;version="1.0.0", - org.osgi.util.tracker;version="1.5.0" +Import-Package: org.osgi.framework;version="[1.9.0,2.0.0)", + org.osgi.service.log;version="[1.4.0,2.0.0)", + org.osgi.util.promise;version="[1.0.0,2.0.0)", + org.osgi.util.tracker;version="[1.5.0,2.0.0)" Bundle-ActivationPolicy: lazy +Export-Package: org.osgi.service.log.stream;version="1.0.0", + org.osgi.util.pushstream;version="1.0.0" diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java new file mode 100644 index 000000000..a37e407fd --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java @@ -0,0 +1,60 @@ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>> + implements BufferBuilder<R,T,U> { + + protected Executor worker; + protected int concurrency; + protected PushbackPolicy<T,U> backPressure; + protected QueuePolicy<T,U> bufferingPolicy; + protected U buffer; + + @Override + public BufferBuilder<R,T,U> withBuffer(U queue) { + this.buffer = queue; + return this; + } + + @Override + public BufferBuilder<R,T,U> withQueuePolicy( + QueuePolicy<T,U> queuePolicy) { + this.bufferingPolicy = queuePolicy; + return this; + } + + @Override + public BufferBuilder<R,T,U> withQueuePolicy( + QueuePolicyOption queuePolicyOption) { + this.bufferingPolicy = queuePolicyOption.getPolicy(); + return this; + } + + @Override + public BufferBuilder<R,T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy) { + this.backPressure = pushbackPolicy; + return this; + } + + @Override + public BufferBuilder<R,T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time) { + this.backPressure = pushbackPolicyOption.getPolicy(time); + return this; + } + + @Override + public BufferBuilder<R,T,U> withParallelism(int parallelism) { + this.concurrency = parallelism; + return this; + } + + @Override + public BufferBuilder<R,T,U> withExecutor(Executor executor) { + this.worker = executor; + return this; + } +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java new file mode 100644 index 000000000..2293c1aad --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java @@ -0,0 +1,1480 @@ +package org.osgi.util.pushstream; + +import static java.util.Collections.emptyList; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*; +import static org.osgi.util.pushstream.PushEventConsumer.*; + +import java.time.Duration; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +import org.osgi.util.promise.Deferred; +import org.osgi.util.promise.Promise; +import org.osgi.util.promise.TimeoutException; +import org.osgi.util.pushstream.PushEvent.EventType; + +abstract class AbstractPushStreamImpl<T> implements PushStream<T> { + + static enum State { + BUILDING, STARTED, CLOSED + } + + protected final PushStreamProvider psp; + + protected final Executor defaultExecutor; + protected final ScheduledExecutorService scheduler; + + protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING); + + protected final AtomicReference<PushEventConsumer<T>> next = new AtomicReference<>(); + + protected final AtomicReference<Runnable> onCloseCallback = new AtomicReference<>(); + protected final AtomicReference<Consumer<? super Throwable>> onErrorCallback = new AtomicReference<>(); + + protected abstract boolean begin(); + + AbstractPushStreamImpl(PushStreamProvider psp, + Executor executor, ScheduledExecutorService scheduler) { + this.psp = psp; + this.defaultExecutor = executor; + this.scheduler = scheduler; + } + + protected long handleEvent(PushEvent< ? extends T> event) { + if(closed.get() != CLOSED) { + try { + if(event.isTerminal()) { + close(event.nodata()); + return ABORT; + } else { + PushEventConsumer<T> consumer = next.get(); + long val; + if(consumer == null) { + //TODO log a warning + val = CONTINUE; + } else { + val = consumer.accept(event); + } + if(val < 0) { + close(); + } + return val; + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + } + return ABORT; + } + + @Override + public void close() { + close(PushEvent.close()); + } + + protected boolean close(PushEvent<T> event) { + if(!event.isTerminal()) { + throw new IllegalArgumentException("The event " + event + " is not a close event."); + } + if(closed.getAndSet(CLOSED) != CLOSED) { + PushEventConsumer<T> aec = next.getAndSet(null); + if(aec != null) { + try { + aec.accept(event); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + Runnable handler = onCloseCallback.getAndSet(null); + if(handler != null) { + try { + handler.run(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + if (event.getType() == EventType.ERROR) { + Consumer<? super Throwable> errorHandler = onErrorCallback.getAndSet(null); + if(errorHandler != null) { + try { + errorHandler.accept(event.getFailure()); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + return true; + } + return false; + } + + @Override + public PushStream<T> onClose(Runnable closeHandler) { + if(onCloseCallback.compareAndSet(null, closeHandler)) { + if(closed.get() == State.CLOSED && onCloseCallback.compareAndSet(closeHandler, null)) { + closeHandler.run(); + } + } else { + throw new IllegalStateException("A close handler has already been defined for this stream object"); + } + return this; + } + + @Override + public PushStream<T> onError(Consumer< ? super Throwable> closeHandler) { + if(onErrorCallback.compareAndSet(null, closeHandler)) { + if(closed.get() == State.CLOSED) { + //TODO log already closed + onErrorCallback.set(null); + } + } else { + throw new IllegalStateException("A close handler has already been defined for this stream object"); + } + return this; + } + + private void updateNext(PushEventConsumer<T> consumer) { + if(!next.compareAndSet(null, consumer)) { + throw new IllegalStateException("This stream has already been chained"); + } else if(closed.get() == CLOSED && next.compareAndSet(consumer, null)) { + try { + consumer.accept(PushEvent.close()); + } catch (Exception e) { + //TODO log + e.printStackTrace(); + } + } + } + + @Override + public PushStream<T> filter(Predicate< ? super T> predicate) { + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + updateNext((event) -> { + try { + if (!event.isTerminal()) { + if (predicate.test(event.getData())) { + return eventStream.handleEvent(event); + } else { + return CONTINUE; + } + } + return eventStream.handleEvent(event); + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) { + + AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + updateNext(event -> { + try { + if (!event.isTerminal()) { + return eventStream.handleEvent( + PushEvent.data(mapper.apply(event.getData()))); + } else { + return eventStream.handleEvent(event.nodata()); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public <R> PushStream<R> flatMap( + Function< ? super T, ? extends PushStream< ? extends R>> mapper) { + AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + + PushEventConsumer<R> consumer = e -> { + switch (e.getType()) { + case ERROR : + close(e.nodata()); + return ABORT; + case CLOSE : + // Close should allow the next flat mapped entry + // without closing the stream; + return ABORT; + case DATA : + long returnValue = eventStream.handleEvent(e); + if (returnValue < 0) { + close(); + return ABORT; + } + return returnValue; + default : + throw new IllegalArgumentException( + "The event type " + e.getType() + " is unknown"); + } + }; + + updateNext(event -> { + try { + if (!event.isTerminal()) { + PushStream< ? extends R> mappedStream = mapper + .apply(event.getData()); + + return mappedStream.forEachEvent(consumer) + .getValue() + .longValue(); + } else { + return eventStream.handleEvent(event.nodata()); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public PushStream<T> distinct() { + Set<T> set = Collections.<T>newSetFromMap(new ConcurrentHashMap<>()); + return filter(set::add); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public PushStream<T> sorted() { + return sorted((Comparator)Comparator.naturalOrder()); + } + + @Override + public PushStream<T> sorted(Comparator< ? super T> comparator) { + List<T> list = Collections.synchronizedList(new ArrayList<>()); + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + updateNext(event -> { + try { + switch(event.getType()) { + case DATA : + list.add(event.getData()); + return CONTINUE; + case CLOSE : + list.sort(comparator); + sorted: for (T t : list) { + if (eventStream + .handleEvent(PushEvent.data(t)) < 0) { + break sorted; + } + } + // Fall through + case ERROR : + eventStream.handleEvent(event); + return ABORT; + } + return eventStream.handleEvent(event.nodata()); + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public PushStream<T> limit(long maxSize) { + if(maxSize <= 0) { + throw new IllegalArgumentException("The limit must be greater than zero"); + } + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + AtomicLong counter = new AtomicLong(maxSize); + updateNext(event -> { + try { + if (!event.isTerminal()) { + long count = counter.decrementAndGet(); + if (count > 0) { + return eventStream.handleEvent(event); + } else if (count == 0) { + eventStream.handleEvent(event); + } + return ABORT; + } else { + return eventStream.handleEvent(event.nodata()); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public PushStream<T> limit(Duration maxTime) { + + Runnable start = () -> scheduler.schedule(() -> close(), + maxTime.toNanos(), NANOSECONDS); + + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>( + psp, defaultExecutor, scheduler, this) { + @Override + protected void beginning() { + start.run(); + } + }; + updateNext((event) -> { + try { + return eventStream.handleEvent(event); + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public PushStream<T> timeout(Duration maxTime) { + + AtomicLong lastTime = new AtomicLong(); + long timeout = maxTime.toNanos(); + + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>( + psp, defaultExecutor, scheduler, this) { + @Override + protected void beginning() { + lastTime.set(System.nanoTime()); + scheduler.schedule(() -> check(lastTime, timeout), timeout, + NANOSECONDS); + } + }; + updateNext((event) -> { + try { + return eventStream.handleEvent(event); + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + void check(AtomicLong lastTime, long timeout) { + long now = System.nanoTime(); + + long elapsed = now - lastTime.get(); + + if (elapsed < timeout) { + scheduler.schedule(() -> check(lastTime, timeout), + timeout - elapsed, NANOSECONDS); + } else { + close(PushEvent.error(new TimeoutException())); + } + } + + @Override + public PushStream<T> skip(long n) { + if (n < 0) { + throw new IllegalArgumentException( + "The number to skip must be greater than or equal to zero"); + } + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + AtomicLong counter = new AtomicLong(n); + updateNext(event -> { + try { + if (!event.isTerminal()) { + if (counter.get() > 0 && counter.decrementAndGet() >= 0) { + return CONTINUE; + } else { + return eventStream.handleEvent(event); + } + } else { + return eventStream.handleEvent(event.nodata()); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public PushStream<T> fork(int n, int delay, Executor ex) { + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, ex, scheduler, this); + Semaphore s = new Semaphore(n); + updateNext(event -> { + try { + if (event.isTerminal()) { + s.acquire(n); + eventStream.close(event.nodata()); + return ABORT; + } + + s.acquire(1); + + ex.execute(() -> { + try { + if (eventStream.handleEvent(event) < 0) { + eventStream.close(PushEvent.close()); + } + } catch (Exception e1) { + close(PushEvent.error(e1)); + } finally { + s.release(1); + } + }); + + return s.getQueueLength() * delay; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + + return eventStream; + } + + @Override + public PushStream<T> buffer() { + return psp.createStream(c -> { + forEachEvent(c); + return this; + }); + } + + @Override + public <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer() { + return psp.buildStream(c -> { + forEachEvent(c); + return this; + }); + } + + @Override + public PushStream<T> merge( + PushEventSource< ? extends T> source) { + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + AtomicInteger count = new AtomicInteger(2); + PushEventConsumer<T> consumer = event -> { + try { + if (!event.isTerminal()) { + return eventStream.handleEvent(event); + } + + if (count.decrementAndGet() == 0) { + eventStream.handleEvent(event.nodata()); + return ABORT; + } + return CONTINUE; + } catch (Exception e) { + PushEvent<T> error = PushEvent.error(e); + close(error); + eventStream.close(event.nodata()); + return ABORT; + } + }; + updateNext(consumer); + AutoCloseable second; + try { + second = source.open((PushEvent< ? extends T> event) -> { + return consumer.accept(event); + }); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new IllegalStateException( + "Unable to merge events as the event source could not be opened.", + e); + } + + return eventStream.onClose(() -> { + try { + second.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }).map(Function.identity()); + } + + @Override + public PushStream<T> merge(PushStream< ? extends T> source) { + + AtomicInteger count = new AtomicInteger(2); + Consumer<AbstractPushStreamImpl<T>> start = downstream -> { + PushEventConsumer<T> consumer = e -> { + long toReturn; + try { + if (!e.isTerminal()) { + toReturn = downstream.handleEvent(e); + } else if (count.decrementAndGet() == 0) { + downstream.handleEvent(e); + toReturn = ABORT; + } else { + return ABORT; + } + } catch (Exception ex) { + try { + downstream.handleEvent(PushEvent.error(ex)); + } catch (Exception ex2) { /* Just ignore this */} + toReturn = ABORT; + } + if (toReturn < 0) { + try { + close(); + } catch (Exception ex2) { /* Just ignore this */} + try { + source.close(); + } catch (Exception ex2) { /* Just ignore this */} + } + return toReturn; + }; + forEachEvent(consumer); + source.forEachEvent(consumer); + }; + + @SuppressWarnings("resource") + AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>( + psp, defaultExecutor, scheduler) { + @Override + protected boolean begin() { + if (closed.compareAndSet(BUILDING, STARTED)) { + start.accept(this); + return true; + } + return false; + } + }; + + + return eventStream.onClose(() -> { + try { + close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + try { + source.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }).map(Function.identity()); + } + + @SuppressWarnings("unchecked") + @Override + public PushStream<T>[] split(Predicate< ? super T>... predicates) { + Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length); + AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length]; + for(int i = 0; i < tests.length; i++) { + rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor, + scheduler, this); + } + + Boolean[] array = new Boolean[tests.length]; + Arrays.fill(array, Boolean.TRUE); + AtomicReferenceArray<Boolean> off = new AtomicReferenceArray<>(array); + + AtomicInteger count = new AtomicInteger(tests.length); + updateNext(event -> { + if (!event.isTerminal()) { + long delay = CONTINUE; + for (int i = 0; i < tests.length; i++) { + try { + if (off.get(i).booleanValue() + && tests[i].test(event.getData())) { + long accept = rsult[i].handleEvent(event); + if (accept < 0) { + off.set(i, Boolean.TRUE); + count.decrementAndGet(); + } else if (accept > delay) { + accept = delay; + } + } + } catch (Exception e) { + try { + rsult[i].close(PushEvent.error(e)); + } catch (Exception e2) { + //TODO log + } + off.set(i, Boolean.TRUE); + } + } + if (count.get() == 0) + return ABORT; + + return delay; + } + for (AbstractPushStreamImpl<T> as : rsult) { + try { + as.handleEvent(event.nodata()); + } catch (Exception e) { + try { + as.close(PushEvent.error(e)); + } catch (Exception e2) { + //TODO log + } + } + } + return ABORT; + }); + return Arrays.copyOf(rsult, tests.length); + } + + @Override + public PushStream<T> sequential() { + AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + Lock lock = new ReentrantLock(); + updateNext((event) -> { + try { + lock.lock(); + try { + return eventStream.handleEvent(event); + } finally { + lock.unlock(); + } + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public <R> PushStream<R> coalesce( + Function< ? super T,Optional<R>> accumulator) { + AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>( + psp, defaultExecutor, scheduler, this); + updateNext((event) -> { + try { + if (!event.isTerminal()) { + Optional<PushEvent<R>> coalesced = accumulator + .apply(event.getData()).map(PushEvent::data); + if (coalesced.isPresent()) { + try { + return eventStream.handleEvent(coalesced.get()); + } catch (Exception ex) { + close(PushEvent.error(ex)); + return ABORT; + } + } else { + return CONTINUE; + } + } + return eventStream.handleEvent(event.nodata()); + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + @Override + public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f) { + if (count <= 0) + throw new IllegalArgumentException( + "A coalesce operation must collect a positive number of events"); + // This could be optimised to only use a single collection queue. + // It would save some GC, but is it worth it? + return coalesce(() -> count, f); + } + + @Override + public <R> PushStream<R> coalesce(IntSupplier count, + Function<Collection<T>,R> f) { + AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>( + null); + + Runnable init = () -> queueRef + .set(getQueueForInternalBuffering(count.getAsInt())); + + @SuppressWarnings("resource") + AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>( + psp, defaultExecutor, scheduler, this) { + @Override + protected void beginning() { + init.run(); + } + }; + + AtomicBoolean endPending = new AtomicBoolean(); + Object lock = new Object(); + updateNext((event) -> { + try { + Queue<T> queue; + if (!event.isTerminal()) { + synchronized (lock) { + for (;;) { + queue = queueRef.get(); + if (queue == null) { + if (endPending.get()) { + return ABORT; + } else { + continue; + } + } else if (queue.offer(event.getData())) { + return CONTINUE; + } else { + queueRef.lazySet(null); + break; + } + } + } + + queueRef.set( + getQueueForInternalBuffering(count.getAsInt())); + + // This call is on the same thread and so must happen + // outside + // the synchronized block. + return aggregateAndForward(f, eventStream, event, + queue); + } else { + synchronized (lock) { + queue = queueRef.get(); + queueRef.lazySet(null); + endPending.set(true); + } + if (queue != null) { + eventStream.handleEvent( + PushEvent.data(f.apply(queue))); + } + } + return eventStream.handleEvent(event.nodata()); + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + private <R> long aggregateAndForward(Function<Collection<T>,R> f, + AbstractPushStreamImpl<R> eventStream, + PushEvent< ? extends T> event, Queue<T> queue) { + if (!queue.offer(event.getData())) { + ((ArrayQueue<T>) queue).forcePush(event.getData()); + } + return eventStream.handleEvent(PushEvent.data(f.apply(queue))); + } + + + @Override + public <R> PushStream<R> window(Duration time, + Function<Collection<T>,R> f) { + return window(time, defaultExecutor, f); + } + + @Override + public <R> PushStream<R> window(Duration time, Executor executor, + Function<Collection<T>,R> f) { + return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c)); + } + + @Override + public <R> PushStream<R> window(Supplier<Duration> time, + IntSupplier maxEvents, + BiFunction<Long,Collection<T>,R> f) { + return window(time, maxEvents, defaultExecutor, f); + } + + @Override + public <R> PushStream<R> window(Supplier<Duration> time, + IntSupplier maxEvents, Executor ex, + BiFunction<Long,Collection<T>,R> f) { + + AtomicLong timestamp = new AtomicLong(); + AtomicLong previousWindowSize = new AtomicLong(); + AtomicLong counter = new AtomicLong(); + Object lock = new Object(); + AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>( + null); + + // This code is declared as a separate block to avoid any confusion + // about which instance's methods and variables are in scope + Consumer<AbstractPushStreamImpl<R>> begin = p -> { + + synchronized (lock) { + timestamp.lazySet(System.nanoTime()); + long count = counter.get(); + + + long windowSize = time.get().toNanos(); + previousWindowSize.set(windowSize); + scheduler.schedule( + getWindowTask(p, f, time, maxEvents, lock, count, + queueRef, timestamp, counter, + previousWindowSize, ex), + windowSize, NANOSECONDS); + } + + queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt())); + }; + + @SuppressWarnings("resource") + AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>( + psp, ex, scheduler, this) { + @Override + protected void beginning() { + begin.accept(this); + } + }; + + AtomicBoolean endPending = new AtomicBoolean(false); + updateNext((event) -> { + try { + if (eventStream.closed.get() == CLOSED) { + return ABORT; + } + Queue<T> queue; + if (!event.isTerminal()) { + long elapsed; + long newCount; + synchronized (lock) { + for (;;) { + queue = queueRef.get(); + if (queue == null) { + if (endPending.get()) { + return ABORT; + } else { + continue; + } + } else if (queue.offer(event.getData())) { + return CONTINUE; + } else { + queueRef.lazySet(null); + break; + } + } + + long now = System.nanoTime(); + elapsed = now - timestamp.get(); + timestamp.lazySet(now); + newCount = counter.get() + 1; + counter.lazySet(newCount); + + // This is a non-blocking call, and must happen in the + // synchronized block to avoid re=ordering the executor + // enqueue with a subsequent incoming close operation + aggregateAndForward(f, eventStream, event, queue, + ex, elapsed); + } + // These must happen outside the synchronized block as we + // call out to user code + queueRef.set( + getQueueForInternalBuffering(maxEvents.getAsInt())); + long nextWindow = time.get().toNanos(); + long backpressure = previousWindowSize.getAndSet(nextWindow) + - elapsed; + scheduler.schedule( + getWindowTask(eventStream, f, time, maxEvents, lock, + newCount, queueRef, timestamp, counter, + previousWindowSize, ex), + nextWindow, NANOSECONDS); + + return backpressure < 0 ? CONTINUE + : NANOSECONDS.toMillis(backpressure); + } else { + long elapsed; + synchronized (lock) { + queue = queueRef.get(); + queueRef.lazySet(null); + endPending.set(true); + long now = System.nanoTime(); + elapsed = now - timestamp.get(); + counter.lazySet(counter.get() + 1); + } + Collection<T> collected = queue == null ? emptyList() + : queue; + ex.execute(() -> { + try { + eventStream + .handleEvent(PushEvent.data(f.apply( + Long.valueOf(NANOSECONDS + .toMillis(elapsed)), + collected))); + } catch (Exception e) { + close(PushEvent.error(e)); + } + }); + } + ex.execute(() -> eventStream.handleEvent(event.nodata())); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + return eventStream; + } + + protected Queue<T> getQueueForInternalBuffering(int size) { + if (size == 0) { + return new LinkedList<T>(); + } else { + return new ArrayQueue<>(size - 1); + } + } + + @SuppressWarnings("unchecked") + /** + * A special queue that keeps one element in reserve and can have that last + * element set using forcePush. After the element is set the capacity is + * permanently increased by one and cannot grow further. + * + * @param <E> The element type + */ + private static class ArrayQueue<E> extends AbstractQueue<E> + implements Queue<E> { + + final Object[] store; + + int normalLength; + + int nextIndex; + + int size; + + ArrayQueue(int capacity) { + store = new Object[capacity + 1]; + normalLength = store.length - 1; + } + + @Override + public boolean offer(E e) { + if (e == null) + throw new NullPointerException("Null values are not supported"); + if (size < normalLength) { + store[nextIndex] = e; + size++; + nextIndex++; + nextIndex = nextIndex % normalLength; + return true; + } + return false; + } + + public void forcePush(E e) { + store[normalLength] = e; + normalLength++; + size++; + } + + @Override + public E poll() { + if (size == 0) { + return null; + } else { + int idx = nextIndex - size; + if (idx < 0) { + idx += normalLength; + } + E value = (E) store[idx]; + store[idx] = null; + size--; + return value; + } + } + + @Override + public E peek() { + if (size == 0) { + return null; + } else { + int idx = nextIndex - size; + if (idx < 0) { + idx += normalLength; + } + return (E) store[idx]; + } + } + + @Override + public Iterator<E> iterator() { + final int previousNext = nextIndex; + return new Iterator<E>() { + + int idx; + + int remaining = size; + + { + idx = nextIndex - size; + if (idx < 0) { + idx += normalLength; + } + } + + @Override + public boolean hasNext() { + if (nextIndex != previousNext) { + throw new ConcurrentModificationException( + "The queue was concurrently modified"); + } + return remaining > 0; + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException( + "The iterator has no more values"); + } + E value = (E) store[idx]; + idx++; + remaining--; + if (idx == normalLength) { + idx = 0; + } + return value; + } + + }; + } + + @Override + public int size() { + return size; + } + + } + + private <R> Runnable getWindowTask(AbstractPushStreamImpl<R> eventStream, + BiFunction<Long,Collection<T>,R> f, Supplier<Duration> time, + IntSupplier maxEvents, Object lock, long expectedCounter, + AtomicReference<Queue<T>> queueRef, AtomicLong timestamp, + AtomicLong counter, AtomicLong previousWindowSize, + Executor executor) { + return () -> { + + Queue<T> queue = null; + long elapsed; + synchronized (lock) { + + if (counter.get() != expectedCounter) { + return; + } + counter.lazySet(expectedCounter + 1); + + long now = System.nanoTime(); + elapsed = now - timestamp.get(); + timestamp.lazySet(now); + + queue = queueRef.get(); + queueRef.lazySet(null); + + // This is a non-blocking call, and must happen in the + // synchronized block to avoid re=ordering the executor + // enqueue with a subsequent incoming close operation + + Collection<T> collected = queue == null ? emptyList() : queue; + executor.execute(() -> { + try { + eventStream.handleEvent(PushEvent.data(f.apply( + Long.valueOf(NANOSECONDS.toMillis(elapsed)), + collected))); + } catch (Exception e) { + close(PushEvent.error(e)); + } + }); + } + + // These must happen outside the synchronized block as we + // call out to user code + long nextWindow = time.get().toNanos(); + previousWindowSize.set(nextWindow); + queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt())); + scheduler.schedule( + getWindowTask(eventStream, f, time, maxEvents, lock, + expectedCounter + 1, queueRef, timestamp, counter, + previousWindowSize, executor), + nextWindow, NANOSECONDS); + }; + } + + private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f, + AbstractPushStreamImpl<R> eventStream, + PushEvent< ? extends T> event, Queue<T> queue, Executor executor, + long elapsed) { + executor.execute(() -> { + try { + if (!queue.offer(event.getData())) { + ((ArrayQueue<T>) queue).forcePush(event.getData()); + } + long result = eventStream.handleEvent(PushEvent.data( + f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)), + queue))); + if (result < 0) { + close(); + } + } catch (Exception e) { + close(PushEvent.error(e)); + } + }); + } + + @Override + public Promise<Void> forEach(Consumer< ? super T> action) { + Deferred<Void> d = new Deferred<>(); + updateNext((event) -> { + try { + switch(event.getType()) { + case DATA: + action.accept(event.getData()); + return CONTINUE; + case CLOSE: + d.resolve(null); + break; + case ERROR: + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + d.fail(e); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + + @Override + public Promise<Object[]> toArray() { + return collect(Collectors.toList()) + .map(List::toArray); + } + + @Override + public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) { + return collect(Collectors.toList()) + .map(l -> l.toArray(generator.apply(l.size()))); + } + + @Override + public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) { + Deferred<T> d = new Deferred<>(); + AtomicReference<T> iden = new AtomicReference<T>(identity); + + updateNext(event -> { + try { + switch(event.getType()) { + case DATA: + iden.accumulateAndGet(event.getData(), accumulator); + return CONTINUE; + case CLOSE: + d.resolve(iden.get()); + break; + case ERROR: + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + + @Override + public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) { + Deferred<Optional<T>> d = new Deferred<>(); + AtomicReference<T> iden = new AtomicReference<T>(null); + + updateNext(event -> { + try { + switch(event.getType()) { + case DATA: + if (!iden.compareAndSet(null, event.getData())) + iden.accumulateAndGet(event.getData(), accumulator); + return CONTINUE; + case CLOSE: + d.resolve(Optional.ofNullable(iden.get())); + break; + case ERROR: + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + + @Override + public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { + Deferred<U> d = new Deferred<>(); + AtomicReference<U> iden = new AtomicReference<>(identity); + + updateNext(event -> { + try { + switch(event.getType()) { + case DATA: + iden.updateAndGet((e) -> accumulator.apply(e, event.getData())); + return CONTINUE; + case CLOSE: + d.resolve(iden.get()); + break; + case ERROR: + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + + @Override + public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) { + A result = collector.supplier().get(); + BiConsumer<A, ? super T> accumulator = collector.accumulator(); + Deferred<R> d = new Deferred<>(); + PushEventConsumer<T> consumer; + + if (collector.characteristics() + .contains(Collector.Characteristics.CONCURRENT)) { + consumer = event -> { + try { + switch (event.getType()) { + case DATA : + accumulator.accept(result, event.getData()); + return CONTINUE; + case CLOSE : + d.resolve(collector.finisher().apply(result)); + break; + case ERROR : + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }; + } else { + consumer = event -> { + try { + switch (event.getType()) { + case DATA : + synchronized (result) { + accumulator.accept(result, event.getData()); + } + return CONTINUE; + case CLOSE : + d.resolve(collector.finisher().apply(result)); + break; + case ERROR : + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }; + } + + updateNext(consumer); + begin(); + return d.getPromise(); + } + + @Override + public Promise<Optional<T>> min(Comparator<? super T> comparator) { + return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b); + } + + @Override + public Promise<Optional<T>> max(Comparator<? super T> comparator) { + return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b); + } + + @Override + public Promise<Long> count() { + Deferred<Long> d = new Deferred<>(); + LongAdder counter = new LongAdder(); + updateNext((event) -> { + try { + switch(event.getType()) { + case DATA: + counter.add(1); + return CONTINUE; + case CLOSE: + d.resolve(Long.valueOf(counter.sum())); + break; + case ERROR: + d.fail(event.getFailure()); + break; + } + close(event.nodata()); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + + @Override + public Promise<Boolean> anyMatch(Predicate<? super T> predicate) { + return filter(predicate).findAny() + .map(Optional::isPresent); + } + + @Override + public Promise<Boolean> allMatch(Predicate<? super T> predicate) { + return filter(x -> !predicate.test(x)).findAny() + .map(o -> Boolean.valueOf(!o.isPresent())); + } + + @Override + public Promise<Boolean> noneMatch(Predicate<? super T> predicate) { + return filter(predicate).findAny() + .map(o -> Boolean.valueOf(!o.isPresent())); + } + + @Override + public Promise<Optional<T>> findFirst() { + Deferred<Optional<T>> d = new Deferred<>(); + updateNext((event) -> { + try { + Optional<T> o = null; + switch(event.getType()) { + case DATA: + o = Optional.of(event.getData()); + break; + case CLOSE: + o = Optional.empty(); + break; + case ERROR: + d.fail(event.getFailure()); + return ABORT; + } + if(!d.getPromise().isDone()) + d.resolve(o); + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + + @Override + public Promise<Optional<T>> findAny() { + return findFirst(); + } + + @Override + public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) { + Deferred<Long> d = new Deferred<>(); + LongAdder la = new LongAdder(); + updateNext((event) -> { + try { + switch(event.getType()) { + case DATA: + long value = action.accept(event); + la.add(value); + return value; + case CLOSE: + try { + action.accept(event); + } finally { + d.resolve(Long.valueOf(la.sum())); + } + break; + case ERROR: + try { + action.accept(event); + } finally { + d.fail(event.getFailure()); + } + break; + } + return ABORT; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + }); + begin(); + return d.getPromise(); + } + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java new file mode 100644 index 000000000..2aa6ec763 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java @@ -0,0 +1,79 @@ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +/** + * Create a buffered section of a Push-based stream + * + * @param <R> The type of object being built + * @param <T> The type of objects in the {@link PushEvent} + * @param <U> The type of the Queue used in the user specified buffer + */ +public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> { + + /** + * The BlockingQueue implementation to use as a buffer + * + * @param queue + * @return this builder + */ + BufferBuilder<R, T, U> withBuffer(U queue); + + /** + * Set the {@link QueuePolicy} of this Builder + * + * @param queuePolicy + * @return this builder + */ + BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy); + + /** + * Set the {@link QueuePolicy} of this Builder + * + * @param queuePolicyOption + * @return this builder + */ + BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption); + + /** + * Set the {@link PushbackPolicy} of this builder + * + * @param pushbackPolicy + * @return this builder + */ + BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy); + + /** + * Set the {@link PushbackPolicy} of this builder + * + * @param pushbackPolicyOption + * @param time + * @return this builder + */ + BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time); + + /** + * Set the maximum permitted number of concurrent event deliveries allowed + * from this buffer + * + * @param parallelism + * @return this builder + */ + BufferBuilder<R, T, U> withParallelism(int parallelism); + + /** + * Set the {@link Executor} that should be used to deliver events from this + * buffer + * + * @param executor + * @return this builder + */ + BufferBuilder<R, T, U> withExecutor(Executor executor); + + /** + * @return the object being built + */ + R create(); + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java new file mode 100644 index 000000000..7cedafb5c --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java @@ -0,0 +1,111 @@ +package org.osgi.util.pushstream; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED; +import static org.osgi.util.pushstream.PushEventConsumer.ABORT; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> { + + private final U eventQueue; + + private final Semaphore semaphore; + + private final Executor worker; + + private final QueuePolicy<T, U> queuePolicy; + + private final PushbackPolicy<T, U> pushbackPolicy; + + /** + * Indicates that a terminal event has been received, that we should stop + * collecting new events, and that we must drain the buffer before + * continuing + */ + private final AtomicBoolean softClose = new AtomicBoolean(); + + private final int parallelism; + + BufferedPushStreamImpl(PushStreamProvider psp, + ScheduledExecutorService scheduler, U eventQueue, + int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy, + PushbackPolicy<T,U> pushbackPolicy, + Function<PushEventConsumer<T>,AutoCloseable> connector) { + super(psp, worker, scheduler, connector); + this.eventQueue = eventQueue; + this.parallelism = parallelism; + this.semaphore = new Semaphore(parallelism); + this.worker = worker; + this.queuePolicy = queuePolicy; + this.pushbackPolicy = pushbackPolicy; + } + + @Override + protected long handleEvent(PushEvent< ? extends T> event) { + + // If we have already been soft closed, or hard closed then abort + if (!softClose.compareAndSet(false, event.isTerminal()) + || closed.get() == CLOSED) { + return ABORT; + } + + try { + queuePolicy.doOffer(eventQueue, event); + long backPressure = pushbackPolicy.pushback(eventQueue); + if(backPressure < 0) { + close(); + return ABORT; + } + if(semaphore.tryAcquire()) { + startWorker(); + } + return backPressure; + } catch (Exception e) { + close(PushEvent.error(e)); + return ABORT; + } + } + + private void startWorker() { + worker.execute(() -> { + try { + PushEvent< ? extends T> event; + while ((event = eventQueue.poll()) != null) { + if (event.isTerminal()) { + // Wait for the other threads to finish + semaphore.acquire(parallelism - 1); + } + + long backpressure = super.handleEvent(event); + if(backpressure < 0) { + close(); + return; + } else if(backpressure > 0) { + scheduler.schedule(this::startWorker, backpressure, + MILLISECONDS); + return; + } + } + + semaphore.release(); + } catch (Exception e) { + close(PushEvent.error(e)); + } + if(eventQueue.peek() != null && semaphore.tryAcquire()) { + try { + startWorker(); + } catch (Exception e) { + close(PushEvent.error(e)); + } + } + }); + + } +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java new file mode 100644 index 000000000..3a4da2fd9 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java @@ -0,0 +1,35 @@ +package org.osgi.util.pushstream; + +import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T> + implements PushStream<T> { + + private final AbstractPushStreamImpl< ? > previous; + + IntermediatePushStreamImpl(PushStreamProvider psp, + Executor executor, ScheduledExecutorService scheduler, + AbstractPushStreamImpl< ? > previous) { + super(psp, executor, scheduler); + this.previous = previous; + } + + @Override + protected boolean begin() { + if(closed.compareAndSet(BUILDING, STARTED)) { + beginning(); + previous.begin(); + return true; + } + return false; + } + + protected void beginning() { + // The base implementation has nothing to do, but + // this method is used in windowing + } + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java new file mode 100644 index 000000000..028f0a392 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import static org.osgi.util.pushstream.PushEvent.EventType.*; + +/** + * A PushEvent is an immutable object that is transferred through a + * communication channel to push information to a downstream consumer. The event + * has three different types: + * <ul> + * <li>{@link EventType#DATA} – Provides access to a typed data element in the + * stream. + * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this + * event, no more events will follow. + * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem + * and is sending the reason downstream. The stream is closed and no more events + * will follow after this event. + * </ul> + * + * @param <T> The payload type of the event. + * @Immutable + */ +public abstract class PushEvent<T> { + + /** + * The type of a {@link PushEvent}. + */ + public static enum EventType { + /** + * A data event forming part of the stream + */ + DATA, + /** + * An error event that indicates streaming has failed and that no more + * events will arrive + */ + ERROR, + /** + * An event that indicates that the stream has terminated normally + */ + CLOSE + } + + /** + * Package private default constructor. + */ + PushEvent() {} + + /** + * Get the type of this event. + * + * @return The type of this event. + */ + public abstract EventType getType(); + + /** + * Return the data for this event. + * + * @return The data payload. + * @throws IllegalStateException if this event is not a + * {@link EventType#DATA} event. + */ + public T getData() throws IllegalStateException { + throw new IllegalStateException( + "Not a DATA event, the event type is " + getType()); + } + + /** + * Return the error that terminated the stream. + * + * @return The error that terminated the stream. + * @throws IllegalStateException if this event is not an + * {@link EventType#ERROR} event. + */ + public Exception getFailure() throws IllegalStateException { + throw new IllegalStateException( + "Not an ERROR event, the event type is " + getType()); + } + + /** + * Answer if no more events will follow after this event. + * + * @return {@code false} if this is a data event, otherwise {@code true}. + */ + public boolean isTerminal() { + return true; + } + + /** + * Create a new data event. + * + * @param <T> The payload type. + * @param payload The payload. + * @return A new data event wrapping the specified payload. + */ + public static <T> PushEvent<T> data(T payload) { + return new DataEvent<T>(payload); + } + + /** + * Create a new error event. + * + * @param <T> The payload type. + * @param e The error. + * @return A new error event with the specified error. + */ + public static <T> PushEvent<T> error(Exception e) { + return new ErrorEvent<T>(e); + } + + /** + * Create a new close event. + * + * @param <T> The payload type. + * @return A new close event. + */ + public static <T> PushEvent<T> close() { + return new CloseEvent<T>(); + } + + /** + * Convenience to cast a close/error event to another payload type. Since + * the payload type is not needed for these events this is harmless. This + * therefore allows you to forward the close/error event downstream without + * creating anew event. + * + * @param <X> The new payload type. + * @return The current error or close event mapped to a new payload type. + * @throws IllegalStateException if the event is a {@link EventType#DATA} + * event. + */ + public <X> PushEvent<X> nodata() throws IllegalStateException { + @SuppressWarnings("unchecked") + PushEvent<X> result = (PushEvent<X>) this; + return result; + } + + static final class DataEvent<T> extends PushEvent<T> { + private final T data; + + DataEvent(T data) { + this.data = data; + } + + @Override + public T getData() throws IllegalStateException { + return data; + } + + @Override + public EventType getType() { + return DATA; + } + + @Override + public boolean isTerminal() { + return false; + } + + @Override + public <X> PushEvent<X> nodata() throws IllegalStateException { + throw new IllegalStateException("This event is a DATA event"); + } + } + + static final class ErrorEvent<T> extends PushEvent<T> { + private final Exception error; + + ErrorEvent(Exception error) { + this.error = error; + } + + @Override + public Exception getFailure() { + return error; + } + + @Override + public EventType getType() { + return ERROR; + } + } + + static final class CloseEvent<T> extends PushEvent<T> { + @Override + public EventType getType() { + return CLOSE; + } + } +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java new file mode 100644 index 000000000..43de152ae --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import org.osgi.annotation.versioning.ConsumerType; + +/** + * An Async Event Consumer asynchronously receives Data events until it receives + * either a Close or Error event. + * + * @param <T> + * The type for the event payload + */ +@ConsumerType +@FunctionalInterface +public interface PushEventConsumer<T> { + + /** + * If ABORT is used as return value, the sender should close the channel all + * the way to the upstream source. The ABORT will not guarantee that no + * more events are delivered since this is impossible in a concurrent + * environment. The consumer should accept subsequent events and close/clean + * up when the Close or Error event is received. + * + * Though ABORT has the value -1, any value less than 0 will act as an + * abort. + */ + long ABORT = -1; + + /** + * A 0 indicates that the consumer is willing to receive subsequent events + * at full speeds. + * + * Any value more than 0 will indicate that the consumer is becoming + * overloaded and wants a delay of the given milliseconds before the next + * event is sent. This allows the consumer to pushback the event delivery + * speed. + */ + long CONTINUE = 0; + + /** + * Accept an event from a source. Events can be delivered on multiple + * threads simultaneously. However, Close and Error events are the last + * events received, no more events must be sent after them. + * + * @param event The event + * @return less than 0 means abort, 0 means continue, more than 0 means + * delay ms + * @throws Exception to indicate that an error has occured and that no + * further events should be delivered to this + * {@link PushEventConsumer} + */ + long accept(PushEvent<? extends T> event) throws Exception; + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java new file mode 100644 index 000000000..d43399d77 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import org.osgi.annotation.versioning.ConsumerType; + +/** + * An event source. An event source can open a channel between a source and a + * consumer. Once the channel is opened (even before it returns) the source can + * send events to the consumer. + * + * A source should stop sending and automatically close the channel when sending + * an event returns a negative value, see {@link PushEventConsumer#ABORT}. + * Values that are larger than 0 should be treated as a request to delay the + * next events with those number of milliseconds. + * + * @param <T> + * The payload type + */ +@ConsumerType +@FunctionalInterface +public interface PushEventSource<T> { + + /** + * Open the asynchronous channel between the source and the consumer. The + * call returns an {@link AutoCloseable}. This can be closed, and should + * close the channel, including sending a Close event if the channel was not + * already closed. The returned object must be able to be closed multiple + * times without sending more than one Close events. + * + * @param aec the consumer (not null) + * @return a {@link AutoCloseable} that can be used to close the stream + * @throws Exception + */ + AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception; +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java new file mode 100644 index 000000000..c26bc8c4d --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java @@ -0,0 +1,609 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.time.Duration; +import java.util.Collection; +import java.util.Comparator; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collector; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.promise.Promise; +import org.osgi.util.promise.TimeoutException; + +/** + * A Push Stream fulfills the same role as the Java 8 stream but it reverses the + * control direction. The Java 8 stream is pull based and this is push based. A + * Push Stream makes it possible to build a pipeline of transformations using a + * builder kind of model. Just like streams, it provides a number of terminating + * methods that will actually open the channel and perform the processing until + * the channel is closed (The source sends a Close event). The results of the + * processing will be send to a Promise, just like any error events. A stream + * can be used multiple times. The Push Stream represents a pipeline. Upstream + * is in the direction of the source, downstream is in the direction of the + * terminating method. Events are sent downstream asynchronously with no + * guarantee for ordering or concurrency. Methods are available to provide + * serialization of the events and splitting in background threads. + * + * @param <T> The Payload type + */ +@ProviderType +public interface PushStream<T> extends AutoCloseable { + + /** + * Must be run after the channel is closed. This handler will run after the + * downstream methods have processed the close event and before the upstream + * methods have closed. + * + * @param closeHandler Will be called on close + * @return This stream + */ + PushStream<T> onClose(Runnable closeHandler); + + /** + * Must be run after the channel is closed. This handler will run after the + * downstream methods have processed the close event and before the upstream + * methods have closed. + * + * @param closeHandler Will be called on close + * @return This stream + */ + PushStream<T> onError(Consumer< ? super Throwable> closeHandler); + + /** + * Only pass events downstream when the predicate tests true. + * + * @param predicate The predicate that is tested (not null) + * @return Builder style (can be a new or the same object) + */ + PushStream<T> filter(Predicate< ? super T> predicate); + + /** + * Map a payload value. + * + * @param mapper The map function + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> map(Function< ? super T, ? extends R> mapper); + + /** + * Flat map the payload value (turn one event into 0..n events of + * potentially another type). + * + * @param mapper The flat map function + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> flatMap( + Function< ? super T, ? extends PushStream< ? extends R>> mapper); + + /** + * Remove any duplicates. Notice that this can be expensive in a large + * stream since it must track previous payloads. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> distinct(); + + /** + * Sorted the elements, assuming that T extends Comparable. This is of + * course expensive for large or infinite streams since it requires + * buffering the stream until close. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> sorted(); + + /** + * Sorted the elements with the given comparator. This is of course + * expensive for large or infinite streams since it requires buffering the + * stream until close. + * + * @param comparator + * @return Builder style (can be a new or the same object) + */ + PushStream<T> sorted(Comparator< ? super T> comparator); + + /** + * Automatically close the channel after the maxSize number of elements is + * received. + * + * @param maxSize Maximum number of elements has been received + * @return Builder style (can be a new or the same object) + */ + PushStream<T> limit(long maxSize); + + /** + * Automatically close the channel after the given amount of time has + * elapsed. + * + * @param maxTime The maximum time that the stream should remain open + * @return Builder style (can be a new or the same object) + */ + PushStream<T> limit(Duration maxTime); + + /** + * Automatically fail the channel if no events are received for the + * indicated length of time. If the timeout is reached then a failure event + * containing a {@link TimeoutException} will be sent. + * + * @param idleTime The length of time that the stream should remain open + * when no events are being received. + * @return Builder style (can be a new or the same object) + */ + PushStream<T> timeout(Duration idleTime); + + /** + * Skip a number of events in the channel. + * + * @param n number of elements to skip + * @throws IllegalArgumentException if the number of events to skip is + * negative + * @return Builder style (can be a new or the same object) + */ + PushStream<T> skip(long n); + + /** + * Execute the downstream events in up to n background threads. If more + * requests are outstanding apply delay * nr of delayed threads back + * pressure. A downstream channel that is closed or throws an exception will + * cause all execution to cease and the stream to close + * + * @param n number of simultaneous background threads to use + * @param delay Nr of ms/thread that is queued back pressure + * @param e an executor to use for the background threads. + * @return Builder style (can be a new or the same object) + * @throws IllegalArgumentException if the number of threads is < 1 or the + * delay is < 0 + * @throws NullPointerException if the Executor is null + */ + PushStream<T> fork(int n, int delay, Executor e) + throws IllegalArgumentException, NullPointerException; + + /** + * Buffer the events in a queue using default values for the queue size and + * other behaviours. Buffered work will be processed asynchronously in the + * rest of the chain. Buffering also blocks the transmission of back + * pressure to previous elements in the chain, although back pressure is + * honoured by the buffer. + * <p> + * Buffers are useful for "bursty" event sources which produce a number of + * events close together, then none for some time. These bursts can + * sometimes overwhelm downstream event consumers. Buffering will not, + * however, protect downstream components from a source which produces + * events faster than they can be consumed. For fast sources + * {@link #filter(Predicate)} and {@link #coalesce(int, Function)} + * {@link #fork(int, int, Executor)} are better choices. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> buffer(); + + /** + * Build a buffer to enqueue events in a queue using custom values for the + * queue size and other behaviours. Buffered work will be processed + * asynchronously in the rest of the chain. Buffering also blocks the + * transmission of back pressure to previous elements in the chain, although + * back pressure is honoured by the buffer. + * <p> + * Buffers are useful for "bursty" event sources which produce a number of + * events close together, then none for some time. These bursts can + * sometimes overwhelm downstream event consumers. Buffering will not, + * however, protect downstream components from a source which produces + * events faster than they can be consumed. For fast sources + * {@link #filter(Predicate)} and {@link #coalesce(int, Function)} + * {@link #fork(int, int, Executor)} are better choices. + * <p> + * Buffers are also useful as "circuit breakers" in the pipeline. If a + * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger + * the stream to close, preventing an event storm from reaching the client. + * + * @param parallelism + * @param executor + * @param queue + * @param queuePolicy + * @param pushbackPolicy + * @return Builder style (can be a new or the same object) + */ + <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer(); + + /** + * Merge in the events from another source. The resulting channel is not + * closed until this channel and the channel from the source are closed. + * + * @param source The source to merge in. + * @return Builder style (can be a new or the same object) + */ + PushStream<T> merge(PushEventSource< ? extends T> source); + + /** + * Merge in the events from another PushStream. The resulting channel is not + * closed until this channel and the channel from the source are closed. + * + * @param source The source to merge in. + * @return Builder style (can be a new or the same object) + */ + PushStream<T> merge(PushStream< ? extends T> source); + + /** + * Split the events to different streams based on a predicate. If the + * predicate is true, the event is dispatched to that channel on the same + * position. All predicates are tested for every event. + * <p> + * This method differs from other methods of AsyncStream in three + * significant ways: + * <ul> + * <li>The return value contains multiple streams.</li> + * <li>This stream will only close when all of these child streams have + * closed.</li> + * <li>Event delivery is made to all open children that accept the event. + * </li> + * </ul> + * + * @param predicates the predicates to test + * @return streams that map to the predicates + */ + @SuppressWarnings("unchecked") + PushStream<T>[] split(Predicate< ? super T>... predicates); + + /** + * Ensure that any events are delivered sequentially. That is, no + * overlapping calls downstream. This can be used to turn a forked stream + * (where for example a heavy conversion is done in multiple threads) back + * into a sequential stream so a reduce is simple to do. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> sequential(); + + /** + * Coalesces a number of events into a new type of event. The input events + * are forwarded to a accumulator function. This function returns an + * Optional. If the optional is present, it's value is send downstream, + * otherwise it is ignored. + * + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f); + + /** + * Coalesces a number of events into a new type of event. A fixed number of + * input events are forwarded to a accumulator function. This function + * returns new event data to be forwarded on. + * + * @param count + * @param f + * @return Builder style (can be a new or the same object) + */ + public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f); + + /** + * Coalesces a number of events into a new type of event. A variable number + * of input events are forwarded to a accumulator function. The number of + * events to be forwarded is determined by calling the count function. The + * accumulator function then returns new event data to be forwarded on. + * + * @param count + * @param f + * @return Builder style (can be a new or the same object) + */ + public <R> PushStream<R> coalesce(IntSupplier count, + Function<Collection<T>,R> f); + + /** + * Buffers a number of events over a fixed time interval and then forwards + * the events to an accumulator function. This function returns new event + * data to be forwarded on. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered as a different task, (and therefore potentially on a different + * thread) from the one that delivered the event to this {@link PushStream}. + * </li> + * <li>Due to the buffering and asynchronous delivery required, this method + * prevents the propagation of back-pressure to earlier stages</li> + * </ul> + * + * @param d + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f); + + /** + * Buffers a number of events over a fixed time interval and then forwards + * the events to an accumulator function. This function returns new event + * data to be forwarded on. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered by a task given to the supplied executor.</li> + * <li>Due to the buffering and asynchronous delivery required, this method + * prevents the propagation of back-pressure to earlier stages</li> + * </ul> + * + * @param d + * @param executor + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Duration d, Executor executor, + Function<Collection<T>,R> f); + + /** + * Buffers a number of events over a variable time interval and then + * forwards the events to an accumulator function. The length of time over + * which events are buffered is determined by the time function. A maximum + * number of events can also be requested, if this number of events is + * reached then the accumulator will be called early. The accumulator + * function returns new event data to be forwarded on. It is also given the + * length of time for which the buffer accumulated data. This may be less + * than the requested interval if the buffer reached the maximum number of + * requested events early. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered as a different task, (and therefore potentially on a different + * thread) from the one that delivered the event to this {@link PushStream}. + * </li> + * <li>Due to the buffering and asynchronous delivery required, this method + * prevents the propagation of back-pressure to earlier stages</li> + * <li>If the window finishes by hitting the maximum number of events then + * the remaining time in the window will be applied as back-pressure to the + * previous stage, attempting to slow the producer to the expected windowing + * threshold.</li> + * </ul> + * + * @param timeSupplier + * @param maxEvents + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Supplier<Duration> timeSupplier, + IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f); + + /** + * Buffers a number of events over a variable time interval and then + * forwards the events to an accumulator function. The length of time over + * which events are buffered is determined by the time function. A maximum + * number of events can also be requested, if this number of events is + * reached then the accumulator will be called early. The accumulator + * function returns new event data to be forwarded on. It is also given the + * length of time for which the buffer accumulated data. This may be less + * than the requested interval if the buffer reached the maximum number of + * requested events early. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered as a different task, (and therefore potentially on a different + * thread) from the one that delivered the event to this {@link PushStream}. + * </li> + * <li>If the window finishes by hitting the maximum number of events then + * the remaining time in the window will be applied as back-pressure to the + * previous stage, attempting to slow the producer to the expected windowing + * threshold.</li> + * </ul> + * + * @param timeSupplier + * @param maxEvents + * @param executor + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Supplier<Duration> timeSupplier, + IntSupplier maxEvents, Executor executor, + BiFunction<Long,Collection<T>,R> f); + + /** + * Execute the action for each event received until the channel is closed. + * This is a terminating method, the returned promise is resolved when the + * channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param action The action to perform + * @return A promise that is resolved when the channel closes. + */ + Promise<Void> forEach(Consumer< ? super T> action); + + /** + * Collect the payloads in an Object array after the channel is closed. This + * is a terminating method, the returned promise is resolved when the + * channel is closed. + * <p> + * This is a <strong>terminal operation</strong> + * + * @return A promise that is resolved with all the payloads received over + * the channel + */ + Promise<Object[]> toArray(); + + /** + * Collect the payloads in an Object array after the channel is closed. This + * is a terminating method, the returned promise is resolved when the + * channel is closed. The type of the array is handled by the caller using a + * generator function that gets the length of the desired array. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param generator + * @return A promise that is resolved with all the payloads received over + * the channel + */ + <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator); + + /** + * Standard reduce, see Stream. The returned promise will be resolved when + * the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param identity The identity/begin value + * @param accumulator The accumulator + * @return A + */ + Promise<T> reduce(T identity, BinaryOperator<T> accumulator); + + /** + * Standard reduce without identity, so the return is an Optional. The + * returned promise will be resolved when the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param accumulator The accumulator + * @return an Optional + */ + Promise<Optional<T>> reduce(BinaryOperator<T> accumulator); + + /** + * Standard reduce with identity, accumulator and combiner. The returned + * promise will be resolved when the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param identity + * @param accumulator + * @param combiner combines to U's into one U (e.g. how combine two lists) + * @return The promise + */ + <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator, + BinaryOperator<U> combiner); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param collector + * @return A Promise representing the collected results + */ + <R, A> Promise<R> collect(Collector< ? super T,A,R> collector); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param comparator + * @return A Promise representing the minimum value, or null if no values + * are seen before the end of the stream + */ + Promise<Optional<T>> min(Comparator< ? super T> comparator); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param comparator + * @return A Promise representing the maximum value, or null if no values + * are seen before the end of the stream + */ + Promise<Optional<T>> max(Comparator< ? super T> comparator); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @return A Promise representing the number of values in the stream + */ + Promise<Long> count(); + + /** + * Close the channel and resolve the promise with true when the predicate + * matches a payload. If the channel is closed before the predicate matches, + * the promise is resolved with false. + * <p> + * This is a <strong>short circuiting terminal operation</strong> + * + * @param predicate + * @return A Promise that will resolve when an event matches the predicate, + * or the end of the stream is reached + */ + Promise<Boolean> anyMatch(Predicate< ? super T> predicate); + + /** + * Closes the channel and resolve the promise with false when the predicate + * does not matches a pay load.If the channel is closed before, the promise + * is resolved with true. + * <p> + * This is a <strong>short circuiting terminal operation</strong> + * + * @param predicate + * @return A Promise that will resolve when an event fails to match the + * predicate, or the end of the stream is reached + */ + Promise<Boolean> allMatch(Predicate< ? super T> predicate); + + /** + * Closes the channel and resolve the promise with false when the predicate + * matches any pay load. If the channel is closed before, the promise is + * resolved with true. + * <p> + * This is a <strong>short circuiting terminal operation</strong> + * + * @param predicate + * @return A Promise that will resolve when an event matches the predicate, + * or the end of the stream is reached + */ + Promise<Boolean> noneMatch(Predicate< ? super T> predicate); + + /** + * Close the channel and resolve the promise with the first element. If the + * channel is closed before, the Optional will have no value. + * + * @return a promise + */ + Promise<Optional<T>> findFirst(); + + /** + * Close the channel and resolve the promise with the first element. If the + * channel is closed before, the Optional will have no value. + * <p> + * This is a <strong>terminal operation</strong> + * + * @return a promise + */ + Promise<Optional<T>> findAny(); + + /** + * Pass on each event to another consumer until the stream is closed. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param action + * @return a promise + */ + Promise<Long> forEachEvent(PushEventConsumer< ? super T> action); + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java new file mode 100644 index 000000000..d59c8d9d3 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java @@ -0,0 +1,52 @@ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +/** + * A Builder for a PushStream. This Builder extends the support of a standard + * BufferBuilder by allowing the PushStream to be unbuffered. + * + * + * @param <T> The type of objects in the {@link PushEvent} + * @param <U> The type of the Queue used in the user specified buffer + */ +public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends BufferBuilder<PushStream<T>,T,U> { + + /** + * Tells this {@link PushStreamBuilder} to create an unbuffered stream which + * delivers events directly to its consumer using the incoming delivery + * thread. + * + * @return the builder + */ + PushStreamBuilder<T,U> unbuffered(); + + /* + * Overridden methods to allow the covariant return of a PushStreamBuilder + */ + + @Override + PushStreamBuilder<T,U> withBuffer(U queue); + + @Override + PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy); + + @Override + PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption queuePolicyOption); + + @Override + PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy); + + @Override + PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time); + + @Override + PushStreamBuilder<T,U> withParallelism(int parallelism); + + @Override + PushStreamBuilder<T,U> withExecutor(Executor executor); +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java new file mode 100644 index 000000000..5ec7cb336 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java @@ -0,0 +1,88 @@ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends AbstractBufferBuilder<PushStream<T>,T,U> + implements PushStreamBuilder<T,U> { + + private final PushStreamProvider psp; + private final PushEventSource<T> eventSource; + private final Executor previousExecutor; + + private boolean unbuffered; + + PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor, + PushEventSource<T> eventSource) { + this.psp = psp; + this.previousExecutor = defaultExecutor; + this.eventSource = eventSource; + this.worker = defaultExecutor; + } + + @Override + public PushStreamBuilder<T,U> withBuffer(U queue) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withBuffer(queue); + } + + @Override + public PushStreamBuilder<T,U> withQueuePolicy( + QueuePolicy<T,U> queuePolicy) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy); + } + + @Override + public PushStreamBuilder<T,U> withQueuePolicy( + QueuePolicyOption queuePolicyOption) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withQueuePolicy( + queuePolicyOption); + } + + @Override + public PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withPushbackPolicy( + pushbackPolicy); + } + + @Override + public PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withPushbackPolicy( + pushbackPolicyOption, time); + } + + @Override + public PushStreamBuilder<T,U> withParallelism(int parallelism) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withParallelism(parallelism); + } + + @Override + public PushStreamBuilder<T,U> withExecutor(Executor executor) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withExecutor(executor); + } + + @Override + public PushStreamBuilder<T,U> unbuffered() { + unbuffered = true; + return this; + } + + @Override + public PushStream<T> create() { + if (unbuffered) { + return psp.createUnbufferedStream(eventSource, previousExecutor); + } else { + return psp.createStream(eventSource, concurrency, worker, buffer, + bufferingPolicy, backPressure); + } + } +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java new file mode 100644 index 000000000..be87c6bce --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java @@ -0,0 +1,581 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED; +import static org.osgi.util.pushstream.PushEvent.*; +import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR; +import static org.osgi.util.pushstream.QueuePolicyOption.FAIL; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * A factory for {@link PushStream} instances, and utility methods for handling + * {@link PushEventSource}s and {@link PushEventConsumer}s + */ +public final class PushStreamProvider { + + private final Lock lock = new ReentrantLock(true); + + private int schedulerReferences; + + private ScheduledExecutorService scheduler; + + private ScheduledExecutorService acquireScheduler() { + try { + lock.lockInterruptibly(); + try { + schedulerReferences += 1; + + if (schedulerReferences == 1) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + return scheduler; + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new IllegalStateException("Unable to acquire the Scheduler", + e); + } + } + + private void releaseScheduler() { + try { + lock.lockInterruptibly(); + try { + schedulerReferences -= 1; + + if (schedulerReferences == 0) { + scheduler.shutdown(); + scheduler = null; + } + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * Create a stream with the default configured buffer, executor size, queue, + * queue policy and pushback policy. This is equivalent to calling + * + * <code> + * buildStream(source).create(); + * </code> + * + * <p> + * This stream will be buffered from the event producer, and will honour + * back pressure even if the source does not. + * + * <p> + * Buffered streams are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm downstream processors. Buffering will not, + * however, protect downstream components from a source which produces + * events faster (on average) than they can be consumed. + * + * <p> + * Event delivery will not begin until a terminal operation is reached on + * the chain of AsyncStreams. Once a terminal operation is reached the + * stream will be connected to the event source. + * + * @param eventSource + * @return A {@link PushStream} with a default initial buffer + */ + public <T> PushStream<T> createStream(PushEventSource<T> eventSource) { + return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32), + FAIL.getPolicy(), LINEAR.getPolicy(1000)); + } + + /** + * Builds a push stream with custom configuration. + * + * <p> + * + * The resulting {@link PushStream} may be buffered or unbuffered depending + * on how it is configured. + * + * @param eventSource The source of the events + * + * @return A {@link PushStreamBuilder} for the stream + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream( + PushEventSource<T> eventSource) { + return new PushStreamBuilderImpl<T,U>(this, null, eventSource); + } + + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream( + PushEventSource<T> eventSource, int parallelism, Executor executor, + U queue, QueuePolicy<T,U> queuePolicy, + PushbackPolicy<T,U> pushbackPolicy) { + + if (eventSource == null) { + throw new NullPointerException("There is no source of events"); + } + + if (parallelism < 0) { + throw new IllegalArgumentException( + "The supplied parallelism cannot be less than zero. It was " + + parallelism); + } else if (parallelism == 0) { + parallelism = 1; + } + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(parallelism); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + if (queue == null) { + queue = (U) new ArrayBlockingQueue(32); + } + + if (queuePolicy == null) { + queuePolicy = FAIL.getPolicy(); + } + + if (pushbackPolicy == null) { + pushbackPolicy = LINEAR.getPolicy(1000); + } + + @SuppressWarnings("resource") + PushStream<T> stream = new BufferedPushStreamImpl<>(this, + acquireScheduler(), queue, parallelism, toUse, queuePolicy, + pushbackPolicy, aec -> { + try { + return eventSource.open(aec); + } catch (Exception e) { + throw new RuntimeException( + "Unable to connect to event source", e); + } + }); + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + return stream; + } + + <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource, + Executor executor) { + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + @SuppressWarnings("resource") + PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse, + acquireScheduler(), aec -> { + try { + return eventSource.open(aec); + } catch (Exception e) { + throw new RuntimeException( + "Unable to connect to event source", e); + } + }); + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + + return stream; + } + + /** + * Convert an {@link PushStream} into an {@link PushEventSource}. The first + * call to {@link PushEventSource#open(PushEventConsumer)} will begin event + * processing. + * + * The {@link PushEventSource} will remain active until the backing stream + * is closed, and permits multiple consumers to + * {@link PushEventSource#open(PushEventConsumer)} it. + * + * This is equivalent to: <code> + * buildEventSourceFromStream(stream).create(); + * </code> + * + * @param stream + * @return a {@link PushEventSource} backed by the {@link PushStream} + */ + public <T> PushEventSource<T> createEventSourceFromStream( + PushStream<T> stream) { + return buildEventSourceFromStream(stream).create(); + } + + /** + * Convert an {@link PushStream} into an {@link PushEventSource}. The first + * call to {@link PushEventSource#open(PushEventConsumer)} will begin event + * processing. + * + * The {@link PushEventSource} will remain active until the backing stream + * is closed, and permits multiple consumers to + * {@link PushEventSource#open(PushEventConsumer)} it. + * + * @param stream + * + * @return a {@link PushEventSource} backed by the {@link PushStream} + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream( + PushStream<T> stream) { + return new AbstractBufferBuilder<PushEventSource<T>,T,U>() { + @Override + public PushEventSource<T> create() { + SimplePushEventSource<T> spes = createSimplePushEventSource( + concurrency, worker, buffer, bufferingPolicy, () -> { + try { + stream.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + spes.connectPromise() + .then(p -> stream.forEach(t -> spes.publish(t)) + .onResolve(() -> spes.close())); + return spes; + } + }; + } + + + /** + * Create a {@link SimplePushEventSource} with the supplied type and default + * buffering behaviours. The SimplePushEventSource will respond to back + * pressure requests from the consumers connected to it. + * + * This is equivalent to: <code> + * buildSimpleEventSource(type).create(); + * </code> + * + * @param type + * @return a {@link SimplePushEventSource} + */ + public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) { + return createSimplePushEventSource(1, null, + new ArrayBlockingQueue<>(32), + FAIL.getPolicy(), () -> { /* Nothing else to do */ }); + } + + /** + * + * Build a {@link SimplePushEventSource} with the supplied type and custom + * buffering behaviours. The SimplePushEventSource will respond to back + * pressure requests from the consumers connected to it. + * + * @param type + * + * @return a {@link SimplePushEventSource} + */ + + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource( + Class<T> type) { + return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() { + @Override + public SimplePushEventSource<T> create() { + return createSimplePushEventSource(concurrency, worker, buffer, + bufferingPolicy, () -> { /* Nothing else to do */ }); + } + }; + } + + @SuppressWarnings({ + "unchecked", "rawtypes" + }) + <T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource( + int parallelism, Executor executor, U queue, + QueuePolicy<T,U> queuePolicy, Runnable onClose) { + + if (parallelism < 0) { + throw new IllegalArgumentException( + "The supplied parallelism cannot be less than zero. It was " + + parallelism); + } else if (parallelism == 0) { + parallelism = 1; + } + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + if (queue == null) { + queue = (U) new ArrayBlockingQueue(32); + } + + if (queuePolicy == null) { + queuePolicy = FAIL.getPolicy(); + } + + SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>( + toUse, acquireScheduler(), queuePolicy, queue, parallelism, + () -> { + try { + onClose.run(); + } catch (Exception e) { + // TODO log this? + } + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }); + return spes; + } + + /** + * Create a buffered {@link PushEventConsumer} with the default configured + * buffer, executor size, queue, queue policy and pushback policy. This is + * equivalent to calling + * + * <code> + * buildBufferedConsumer(delegate).create(); + * </code> + * + * <p> + * The returned consumer will be buffered from the event source, and will + * honour back pressure requests from its delegate even if the event source + * does not. + * + * <p> + * Buffered consumers are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm the consumer. Buffering will not, however, + * protect downstream components from a source which produces events faster + * than they can be consumed. + * + * @param delegate + * @return a {@link PushEventConsumer} with a buffer directly before it + */ + public <T> PushEventConsumer<T> createBufferedConsumer( + PushEventConsumer<T> delegate) { + return buildBufferedConsumer(delegate).create(); + } + + /** + * Build a buffered {@link PushEventConsumer} with custom configuration. + * <p> + * The returned consumer will be buffered from the event source, and will + * honour back pressure requests from its delegate even if the event source + * does not. + * <p> + * Buffered consumers are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm the consumer. Buffering will not, however, + * protect downstream components from a source which produces events faster + * than they can be consumed. + * <p> + * Buffers are also useful as "circuit breakers". If a + * {@link QueuePolicyOption#FAIL} is used then a full buffer will request + * that the stream close, preventing an event storm from reaching the + * client. + * <p> + * Note that this buffered consumer will close when it receives a terminal + * event, or if the delegate returns negative backpressure. No further + * events will be propagated after this time. + * + * @param delegate + * @return a {@link PushEventConsumer} with a buffer directly before it + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer( + PushEventConsumer<T> delegate) { + return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() { + @Override + public PushEventConsumer<T> create() { + PushEventPipe<T> pipe = new PushEventPipe<>(); + + createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure) + .forEachEvent(delegate); + + return pipe; + } + }; + } + + static final class PushEventPipe<T> + implements PushEventConsumer<T>, PushEventSource<T> { + + volatile PushEventConsumer< ? super T> delegate; + + @Override + public AutoCloseable open(PushEventConsumer< ? super T> pec) + throws Exception { + return () -> { /* Nothing else to do */ }; + } + + @Override + public long accept(PushEvent< ? extends T> event) throws Exception { + return delegate.accept(event); + } + + } + + /** + * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The + * data from the stream will be pushed into the PushStream synchronously as + * it is opened. This may make terminal operations blocking unless a buffer + * has been added to the {@link PushStream}. Care should be taken with + * infinite {@link Stream}s to avoid blocking indefinitely. + * + * @param items The items to push into the PushStream + * @return A PushStream containing the items from the Java Stream + */ + public <T> PushStream<T> streamOf(Stream<T> items) { + PushEventSource<T> pes = aec -> { + AtomicBoolean closed = new AtomicBoolean(false); + + items.mapToLong(i -> { + try { + long returnValue = closed.get() ? -1 : aec.accept(data(i)); + if (returnValue < 0) { + aec.accept(PushEvent.<T> close()); + } + return returnValue; + } catch (Exception e) { + try { + aec.accept(PushEvent.<T> error(e)); + } catch (Exception e2) {/* No further events needed */} + return -1; + } + }).filter(i -> i < 0).findFirst().orElseGet(() -> { + try { + return aec.accept(PushEvent.<T> close()); + } catch (Exception e) { + return -1; + } + }); + + return () -> closed.set(true); + }; + + return this.<T> createUnbufferedStream(pes, null); + } + + /** + * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The + * data from the stream will be pushed into the PushStream asynchronously + * using the supplied Executor. + * + * @param executor The worker to use to push items from the Stream into the + * PushStream + * @param items The items to push into the PushStream + * @return A PushStream containing the items from the Java Stream + */ + public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) { + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + @SuppressWarnings("resource") + PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>( + this, toUse, acquireScheduler(), aec -> { + return () -> { /* No action to take */ }; + }) { + + @Override + protected boolean begin() { + if (super.begin()) { + Iterator<T> it = items.iterator(); + + toUse.execute(() -> pushData(it)); + + return true; + } + return false; + } + + private void pushData(Iterator<T> it) { + while (it.hasNext()) { + try { + long returnValue = closed.get() == CLOSED ? -1 + : handleEvent(data(it.next())); + if (returnValue != 0) { + if (returnValue < 0) { + close(); + return; + } else { + scheduler.schedule( + () -> toUse.execute(() -> pushData(it)), + returnValue, MILLISECONDS); + return; + } + } + } catch (Exception e) { + close(error(e)); + } + } + close(); + } + }; + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + + return stream; + } +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java new file mode 100644 index 000000000..4f7f1864f --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import org.osgi.annotation.versioning.ConsumerType; + +/** + * A {@link PushbackPolicy} is used to calculate how much back pressure to apply + * based on the current buffer. The {@link PushbackPolicy} will be called after + * an event has been queued, and the returned value will be used as back + * pressure. + * + * @see PushbackPolicyOption + * + * + * @param <T> The type of the data + * @param <U> The type of the queue + */ +@ConsumerType +@FunctionalInterface +public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> { + + /** + * Given the current state of the queue, determine the level of back + * pressure that should be applied + * + * @param queue + * @return a back pressure value in nanoseconds + * @throws Exception + */ + public long pushback(U queue) throws Exception; + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java new file mode 100644 index 000000000..ecd0e3ea3 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * {@link PushbackPolicyOption} provides a standard set of simple + * {@link PushbackPolicy} implementations. + * + * @see PushbackPolicy + */ +public enum PushbackPolicyOption { + + /** + * Returns a fixed amount of back pressure, independent of how full the + * buffer is + */ + FIXED { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + return q -> value; + } + }, + /** + * Returns zero back pressure until the buffer is full, then it returns a + * fixed value + */ + ON_FULL_FIXED { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + return q -> q.remainingCapacity() == 0 ? value : 0; + } + }, + /** + * Returns zero back pressure until the buffer is full, then it returns an + * exponentially increasing amount, starting with the supplied value and + * doubling it each time. Once the buffer is no longer full the back + * pressure returns to zero. + */ + ON_FULL_EXPONENTIAL { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + AtomicInteger backoffCount = new AtomicInteger(0); + return q -> { + if (q.remainingCapacity() == 0) { + return value << backoffCount.getAndIncrement(); + } + backoffCount.set(0); + return 0; + }; + + } + }, + /** + * Returns zero back pressure when the buffer is empty, then it returns a + * linearly increasing amount of back pressure based on how full the buffer + * is. The maximum value will be returned when the buffer is full. + */ + LINEAR { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + return q -> { + long remainingCapacity = q.remainingCapacity(); + long used = q.size(); + return (value * used) / (used + remainingCapacity); + }; + } + }; + + /** + * Create a {@link PushbackPolicy} instance configured with a base back + * pressure time in nanoseconds + * + * The actual backpressure returned will vary based on the selected + * implementation, the base value, and the state of the buffer. + * + * @param value + * @return A {@link PushbackPolicy} to use + */ + public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value); + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java new file mode 100644 index 000000000..cba94b16c --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; + +import org.osgi.annotation.versioning.ConsumerType; +import org.osgi.util.pushstream.PushEvent.EventType; + +/** + * A {@link QueuePolicy} is used to control how events should be queued in the + * current buffer. The {@link QueuePolicy} will be called when an event has + * arrived. + * + * @see QueuePolicyOption + * + * + * @param <T> The type of the data + * @param <U> The type of the queue + */ + +@ConsumerType +@FunctionalInterface +public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> { + + /** + * Enqueue the event and return the remaining capacity available for events + * + * @param queue + * @param event + * @throws Exception If an error ocurred adding the event to the queue. This + * exception will cause the connection between the + * {@link PushEventSource} and the {@link PushEventConsumer} to be + * closed with an {@link EventType#ERROR} + */ + public void doOffer(U queue, PushEvent<? extends T> event) throws Exception; + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java new file mode 100644 index 000000000..35df890ee --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; + +/** + * {@link QueuePolicyOption} provides a standard set of simple + * {@link QueuePolicy} implementations. + * + * @see QueuePolicy + */ +public enum QueuePolicyOption { + /** + * Attempt to add the supplied event to the queue. If the queue is unable to + * immediately accept the value then discard the value at the head of the + * queue and try again. Repeat this process until the event is enqueued. + */ + DISCARD_OLDEST { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() { + return (queue, event) -> { + while (!queue.offer(event)) { + queue.poll(); + } + }; + } + }, + /** + * Attempt to add the supplied event to the queue, blocking until the + * enqueue is successful. + */ + BLOCK { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() { + return (queue, event) -> { + try { + queue.put(event); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + } + }, + /** + * Attempt to add the supplied event to the queue, throwing an exception if + * the queue is full. + */ + FAIL { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() { + return (queue, event) -> queue.add(event); + } + }; + + /** + * @return a {@link QueuePolicy} implementation + */ + public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy(); + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java new file mode 100644 index 000000000..747b4530d --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.promise.Promise; + +/** + * A {@link SimplePushEventSource} is a helper that makes it simpler to write a + * {@link PushEventSource}. Users do not need to manage multiple registrations + * to the stream, nor do they have to be concerned with back pressure. + * + * @param <T> The type of the events produced by this source + */ +@ProviderType +public interface SimplePushEventSource<T> + extends PushEventSource<T>, AutoCloseable { + /** + * Close this source. Calling this method indicates that there will never be + * any more events published by it. Calling this method sends a close event + * to all connected consumers. After calling this method any + * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)} + * this source will immediately receive a close event. + */ + @Override + void close(); + + /** + * Asynchronously publish an event to this stream and all connected + * {@link PushEventConsumer} instances. When this method returns there is no + * guarantee that all consumers have been notified. Events published by a + * single thread will maintain their relative ordering, however they may be + * interleaved with events from other threads. + * + * @param t + * @throws IllegalStateException if the source is closed + */ + void publish(T t); + + /** + * Close this source for now, but potentially reopen it later. Calling this + * method asynchronously sends a close event to all connected consumers. + * After calling this method any {@link PushEventConsumer} that wishes may + * {@link #open(PushEventConsumer)} this source, and will receive subsequent + * events. + */ + void endOfStream(); + + /** + * Close this source for now, but potentially reopen it later. Calling this + * method asynchronously sends an error event to all connected consumers. + * After calling this method any {@link PushEventConsumer} that wishes may + * {@link #open(PushEventConsumer)} this source, and will receive subsequent + * events. + * + * @param e the error + */ + void error(Exception e); + + /** + * Determine whether there are any {@link PushEventConsumer}s for this + * {@link PushEventSource}. This can be used to skip expensive event + * creation logic when there are no listeners. + * + * @return true if any consumers are currently connected + */ + boolean isConnected(); + + /** + * This method can be used to delay event generation until an event source + * has connected. The returned promise will resolve as soon as one or more + * {@link PushEventConsumer} instances have opened the + * SimplePushEventSource. + * <p> + * The returned promise may already be resolved if this + * {@link SimplePushEventSource} already has connected consumers. If the + * {@link SimplePushEventSource} is closed before the returned Promise + * resolves then it will be failed with an {@link IllegalStateException}. + * <p> + * Note that the connected consumers are able to asynchronously close their + * connections to this {@link SimplePushEventSource}, and therefore it is + * possible that once the promise resolves this + * {@link SimplePushEventSource} may no longer be connected to any + * consumers. + * + * @return A promise representing the connection state of this EventSource + */ + Promise<Void> connectPromise(); + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java new file mode 100644 index 000000000..e31c9bf59 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java @@ -0,0 +1,337 @@ +package org.osgi.util.pushstream; + +import static java.util.Collections.emptyList; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.stream.Collectors.toList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; + +import org.osgi.util.promise.Deferred; +import org.osgi.util.promise.Promise; +import org.osgi.util.promise.Promises; + +class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> + implements SimplePushEventSource<T> { + + private final Object lock = new Object(); + + private final Executor worker; + + private final ScheduledExecutorService scheduler; + + private final QueuePolicy<T,U> queuePolicy; + + private final U queue; + + private final int parallelism; + + private final Semaphore semaphore; + + private final List<PushEventConsumer< ? super T>> connected = new ArrayList<>(); + + private final Runnable onClose; + + private boolean closed; + + private Deferred<Void> connectPromise; + + private boolean waitForFinishes; + + + public SimplePushEventSourceImpl(Executor worker, + ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy, + U queue, int parallelism, Runnable onClose) { + this.worker = worker; + this.scheduler = scheduler; + this.queuePolicy = queuePolicy; + this.queue = queue; + this.parallelism = parallelism; + this.semaphore = new Semaphore(parallelism); + this.onClose = onClose; + this.closed = false; + this.connectPromise = null; + } + + @Override + public AutoCloseable open(PushEventConsumer< ? super T> pec) + throws Exception { + Deferred<Void> toResolve = null; + synchronized (lock) { + if (closed) { + throw new IllegalStateException( + "This PushEventConsumer is closed"); + } + + toResolve = connectPromise; + connectPromise = null; + + connected.add(pec); + } + + if (toResolve != null) { + toResolve.resolve(null); + } + + return () -> { + closeConsumer(pec, PushEvent.close()); + }; + } + + private void closeConsumer(PushEventConsumer< ? super T> pec, + PushEvent<T> event) { + boolean sendClose; + synchronized (lock) { + sendClose = connected.remove(pec); + } + if (sendClose) { + doSend(pec, event); + } + } + + private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) { + try { + worker.execute(() -> safePush(pec, event)); + } catch (RejectedExecutionException ree) { + // TODO log? + if (!event.isTerminal()) { + close(PushEvent.error(ree)); + } else { + safePush(pec, event); + } + } + } + + @SuppressWarnings("boxing") + private Promise<Long> doSendWithBackPressure( + PushEventConsumer< ? super T> pec, PushEvent<T> event) { + Deferred<Long> d = new Deferred<>(); + try { + worker.execute( + () -> d.resolve(System.nanoTime() + safePush(pec, event))); + } catch (RejectedExecutionException ree) { + // TODO log? + if (!event.isTerminal()) { + close(PushEvent.error(ree)); + return Promises.resolved(System.nanoTime()); + } else { + return Promises + .resolved(System.nanoTime() + safePush(pec, event)); + } + } + return d.getPromise(); + } + + private long safePush(PushEventConsumer< ? super T> pec, + PushEvent<T> event) { + try { + long backpressure = pec.accept(event) * 1000000; + if (backpressure < 0 && !event.isTerminal()) { + closeConsumer(pec, PushEvent.close()); + return -1; + } + return backpressure; + } catch (Exception e) { + // TODO log? + if (!event.isTerminal()) { + closeConsumer(pec, PushEvent.error(e)); + } + return -1; + } + } + + @Override + public void close() { + close(PushEvent.close()); + } + + private void close(PushEvent<T> event) { + List<PushEventConsumer< ? super T>> toClose; + Deferred<Void> toFail = null; + synchronized (lock) { + if(!closed) { + closed = true; + + toClose = new ArrayList<>(connected); + connected.clear(); + queue.clear(); + + if(connectPromise != null) { + toFail = connectPromise; + connectPromise = null; + } + } else { + toClose = emptyList(); + } + } + + toClose.stream().forEach(pec -> doSend(pec, event)); + + if (toFail != null) { + toFail.resolveWith(closedConnectPromise()); + } + + onClose.run(); + } + + @Override + public void publish(T t) { + enqueueEvent(PushEvent.data(t)); + } + + @Override + public void endOfStream() { + enqueueEvent(PushEvent.close()); + } + + @Override + public void error(Exception e) { + enqueueEvent(PushEvent.error(e)); + } + + private void enqueueEvent(PushEvent<T> event) { + synchronized (lock) { + if (closed || connected.isEmpty()) { + return; + } + } + + try { + queuePolicy.doOffer(queue, event); + boolean start; + synchronized (lock) { + start = !waitForFinishes && semaphore.tryAcquire(); + } + if (start) { + startWorker(); + } + } catch (Exception e) { + close(PushEvent.error(e)); + throw new IllegalStateException( + "The queue policy threw an exception", e); + } + } + + @SuppressWarnings({ + "unchecked", "boxing" + }) + private void startWorker() { + worker.execute(() -> { + try { + + for(;;) { + PushEvent<T> event; + List<PushEventConsumer< ? super T>> toCall; + boolean resetWait = false; + synchronized (lock) { + if(waitForFinishes) { + semaphore.release(); + while(waitForFinishes) { + lock.notifyAll(); + lock.wait(); + } + semaphore.acquire(); + } + + event = (PushEvent<T>) queue.poll(); + + if(event == null) { + break; + } + + toCall = new ArrayList<>(connected); + if (event.isTerminal()) { + waitForFinishes = true; + resetWait = true; + connected.clear(); + while (!semaphore.tryAcquire(parallelism - 1)) { + lock.wait(); + } + } + } + + List<Promise<Long>> calls = toCall.stream().map(pec -> { + if (semaphore.tryAcquire()) { + try { + return doSendWithBackPressure(pec, event); + } finally { + semaphore.release(); + } + } else { + return Promises.resolved( + System.nanoTime() + safePush(pec, event)); + } + }).collect(toList()); + + long toWait = Promises.all(calls) + .map(l -> l.stream() + .max(Long::compareTo) + .orElseGet(() -> System.nanoTime())) + .getValue() - System.nanoTime(); + + + if (toWait > 0) { + scheduler.schedule(this::startWorker, toWait, + NANOSECONDS); + return; + } + + if (resetWait == true) { + synchronized (lock) { + waitForFinishes = false; + lock.notifyAll(); + } + } + } + + semaphore.release(); + } catch (Exception e) { + close(PushEvent.error(e)); + } + if (queue.peek() != null && semaphore.tryAcquire()) { + try { + startWorker(); + } catch (Exception e) { + close(PushEvent.error(e)); + } + } + }); + + } + + @Override + public boolean isConnected() { + synchronized (lock) { + return !connected.isEmpty(); + } + } + + @Override + public Promise<Void> connectPromise() { + synchronized (lock) { + if (closed) { + return closedConnectPromise(); + } + + if (connected.isEmpty()) { + if (connectPromise == null) { + connectPromise = new Deferred<>(); + } + return connectPromise.getPromise(); + } else { + return Promises.resolved(null); + } + } + } + + private Promise<Void> closedConnectPromise() { + return Promises.failed(new IllegalStateException( + "This SimplePushEventSource is closed")); + } + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java new file mode 100644 index 000000000..faf9e6584 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java @@ -0,0 +1,73 @@ +package org.osgi.util.pushstream; + +import static java.util.Optional.ofNullable; +import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends AbstractPushStreamImpl<T> implements PushStream<T> { + + protected final Function<PushEventConsumer<T>,AutoCloseable> connector; + + protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>(); + + UnbufferedPushStreamImpl(PushStreamProvider psp, + Executor executor, ScheduledExecutorService scheduler, + Function<PushEventConsumer<T>,AutoCloseable> connector) { + super(psp, executor, scheduler); + this.connector = connector; + } + + @Override + protected boolean close(PushEvent<T> event) { + if(super.close(event)) { + ofNullable(upstream.getAndSet(() -> { + // This block doesn't need to do anything, but the presence + // of the Closable is needed to prevent duplicate begins + })).ifPresent(c -> { + try { + c.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + return true; + } + return false; + } + + @Override + protected boolean begin() { + if(closed.compareAndSet(BUILDING, STARTED)) { + AutoCloseable toClose = connector.apply(this::handleEvent); + if(!upstream.compareAndSet(null,toClose)) { + //TODO log that we tried to connect twice... + try { + toClose.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if (closed.get() == CLOSED + && upstream.compareAndSet(toClose, null)) { + // We closed before setting the upstream - close it now + try { + toClose.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + return true; + } + return false; + } +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java new file mode 100644 index 000000000..6a28fa0b5 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Push Stream Package Version 1.0. + * + * <p> + * Bundles wishing to use this package must list the package in the + * Import-Package header of the bundle's manifest. + * + * <p> + * Example import for consumers using the API in this package: + * <p> + * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"} + * <p> + * Example import for providers implementing the API in this package: + * <p> + * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"} + * + * @author $Id$ + */ + +@Version("1.0") +package org.osgi.util.pushstream; + +import org.osgi.annotation.versioning.Version; |