diff options
11 files changed, 139 insertions, 61 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java index 99f5645c9..f48266bc6 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) OSGi Alliance (2016). All Rights Reserved. + * Copyright (c) OSGi Alliance (2016, 2017). All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,11 @@ package org.osgi.service.log.stream; import org.osgi.annotation.versioning.ProviderType; import org.osgi.service.log.LogEntry; import org.osgi.util.pushstream.PushStream; +import org.osgi.util.pushstream.QueuePolicyOption; /** - * LogStreamProvider service for creating a PushStream of {@link LogEntry} - * objects. + * LogStreamProvider service for creating a {@link PushStream} of + * {@link LogEntry} objects. * * @ThreadSafe * @author $Id$ @@ -30,34 +31,42 @@ import org.osgi.util.pushstream.PushStream; @ProviderType public interface LogStreamProvider { /** - * Creation options for the PushStream of {@link LogEntry} objects. + * Creation options for the {@link PushStream} of {@link LogEntry} objects. */ enum Options { /** * Include history. * <p> - * Prime the created PushStream with the past {@link LogEntry} objects. - * The number of past {@link LogEntry} objects is implementation - * specific. + * Prime the created PushStream with the available historical + * {@link LogEntry} objects. The number of available {@link LogEntry} + * objects is implementation specific. * <p> - * The created PushStream will supply the past {@link LogEntry} objects - * followed by newly created {@link LogEntry} objects. + * The created PushStream will supply the available historical + * {@link LogEntry} objects followed by newly created {@link LogEntry} + * objects. */ HISTORY; } /** - * Create a PushStream of {@link LogEntry} objects. + * Create a {@link PushStream} of {@link LogEntry} objects. * <p> - * The returned PushStream is an unbuffered stream with a parallelism of - * one. + * The returned PushStream must: + * <ul> + * <li>Be buffered with a buffer large enough to contain the history, if + * included.</li> + * <li>Have the {@link QueuePolicyOption#DISCARD_OLDEST} queue policy + * option.</li> + * <li>Use a shared executor.</li> + * <li>Have a parallelism of one.</li> + * </ul> * <p> * When this LogStreamProvider service is released by the obtaining bundle, - * this LogStreamProvider service must call {@code close()} on the returned - * PushStream object if it has not already been closed. + * this LogStreamProvider service must call {@link PushStream#close()} on + * the returned PushStream object if it has not already been closed. * * @param options The options to use when creating the PushStream. - * @return A PushStream of {@link LogEntry} objects. + * @return A {@link PushStream} of {@link LogEntry} objects. */ PushStream<LogEntry> createStream(Options... options); } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java index 2293c1aad..d64ac129d 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java @@ -37,14 +37,14 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.IntFunction; import java.util.function.IntSupplier; -import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; +import org.osgi.util.function.Function; +import org.osgi.util.function.Predicate; import org.osgi.util.promise.Deferred; import org.osgi.util.promise.Promise; import org.osgi.util.promise.TimeoutException; @@ -52,6 +52,8 @@ import org.osgi.util.pushstream.PushEvent.EventType; abstract class AbstractPushStreamImpl<T> implements PushStream<T> { + private final Function<T,T> IDENTITY = x -> x; + static enum State { BUILDING, STARTED, CLOSED } @@ -70,6 +72,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { protected abstract boolean begin(); + protected abstract void upstreamClose(PushEvent< ? > close); + AbstractPushStreamImpl(PushStreamProvider psp, Executor executor, ScheduledExecutorService scheduler) { this.psp = psp; @@ -107,16 +111,23 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public void close() { - close(PushEvent.close()); + PushEvent<T> close = PushEvent.close(); + if (close(close, true)) { + upstreamClose(close); + } } protected boolean close(PushEvent<T> event) { + return close(event, true); + } + + protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) { if(!event.isTerminal()) { throw new IllegalArgumentException("The event " + event + " is not a close event."); } if(closed.getAndSet(CLOSED) != CLOSED) { PushEventConsumer<T> aec = next.getAndSet(null); - if(aec != null) { + if (sendDownStreamEvent && aec != null) { try { aec.accept(event); } catch (Exception e) { @@ -411,7 +422,10 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { scheduler.schedule(() -> check(lastTime, timeout), timeout - elapsed, NANOSECONDS); } else { - close(PushEvent.error(new TimeoutException())); + PushEvent<T> error = PushEvent.error(new TimeoutException()); + close(error); + // Upstream close is needed as we have no direct backpressure + upstreamClose(error); } } @@ -461,10 +475,18 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { ex.execute(() -> { try { if (eventStream.handleEvent(event) < 0) { - eventStream.close(PushEvent.close()); + PushEvent<T> close = PushEvent.close(); + eventStream.close(close); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(close); } } catch (Exception e1) { - close(PushEvent.error(e1)); + PushEvent<T> error = PushEvent.error(e1); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); } finally { s.release(1); } @@ -541,7 +563,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { // TODO Auto-generated catch block e.printStackTrace(); } - }).map(Function.identity()); + }).map(IDENTITY); } @Override @@ -591,6 +613,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { } return false; } + + @Override + protected void upstreamClose(PushEvent< ? > close) { + AbstractPushStreamImpl.this.upstreamClose(close); + source.close(); + } }; @@ -607,7 +635,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { // TODO Auto-generated catch block e.printStackTrace(); } - }).map(Function.identity()); + }).map(IDENTITY); } @SuppressWarnings("unchecked") @@ -803,7 +831,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { private <R> long aggregateAndForward(Function<Collection<T>,R> f, AbstractPushStreamImpl<R> eventStream, - PushEvent< ? extends T> event, Queue<T> queue) { + PushEvent< ? extends T> event, Queue<T> queue) throws Exception { if (!queue.offer(event.getData())) { ((ArrayQueue<T>) queue).forcePush(event.getData()); } @@ -820,7 +848,13 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public <R> PushStream<R> window(Duration time, Executor executor, Function<Collection<T>,R> f) { - return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c)); + return window(() -> time, () -> 0, executor, (t, c) -> { + try { + return f.apply(c); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Override @@ -946,7 +980,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { .toMillis(elapsed)), collected))); } catch (Exception e) { - close(PushEvent.error(e)); + PushEvent<T> error = PushEvent.error(e); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); } }); } @@ -1126,7 +1164,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { Long.valueOf(NANOSECONDS.toMillis(elapsed)), collected))); } catch (Exception e) { - close(PushEvent.error(e)); + PushEvent<T> error = PushEvent.error(e); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); } }); } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java index 2aa6ec763..dfcf8b503 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java @@ -3,6 +3,8 @@ package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import org.osgi.annotation.versioning.ProviderType; + /** * Create a buffered section of a Push-based stream * @@ -10,6 +12,7 @@ import java.util.concurrent.Executor; * @param <T> The type of objects in the {@link PushEvent} * @param <U> The type of the Queue used in the user specified buffer */ +@ProviderType public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> { /** @@ -74,6 +77,6 @@ public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends /** * @return the object being built */ - R create(); + R build(); } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java index 3a4da2fd9..320adeebc 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java @@ -31,5 +31,13 @@ class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T> // The base implementation has nothing to do, but // this method is used in windowing } + + @Override + protected void upstreamClose(PushEvent< ? > close) { + if (closed.get() != CLOSED) { + close(close.nodata(), false); + } + previous.upstreamClose(close); + } } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java index 028f0a392..968f668f6 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java @@ -18,6 +18,8 @@ package org.osgi.util.pushstream; import static org.osgi.util.pushstream.PushEvent.EventType.*; +import org.osgi.annotation.versioning.ProviderType; + /** * A PushEvent is an immutable object that is transferred through a * communication channel to push information to a downstream consumer. The event @@ -35,6 +37,7 @@ import static org.osgi.util.pushstream.PushEvent.EventType.*; * @param <T> The payload type of the event. * @Immutable */ +@ProviderType public abstract class PushEvent<T> { /** diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java index c26bc8c4d..1471a84f9 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java @@ -25,14 +25,14 @@ import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.IntFunction; import java.util.function.IntSupplier; -import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collector; import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.function.Function; +import org.osgi.util.function.Predicate; import org.osgi.util.promise.Promise; import org.osgi.util.promise.TimeoutException; @@ -56,6 +56,16 @@ import org.osgi.util.promise.TimeoutException; public interface PushStream<T> extends AutoCloseable { /** + * Close this PushStream by sending an event of type + * {@link PushEvent.EventType#CLOSE} downstream. Closing a PushStream is a + * safe operation that will not throw an Exception. + * <p> + * Calling <code>close()</code> on a closed PushStream has no effect. + */ + @Override + void close(); + + /** * Must be run after the channel is closed. This handler will run after the * downstream methods have processed the close event and before the upstream * methods have closed. @@ -222,12 +232,8 @@ public interface PushStream<T> extends AutoCloseable { * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger * the stream to close, preventing an event storm from reaching the client. * - * @param parallelism - * @param executor - * @param queue - * @param queuePolicy - * @param pushbackPolicy - * @return Builder style (can be a new or the same object) + * @return A builder which can be used to configure the buffer for this + * pipeline stage. */ <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer(); diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java index d59c8d9d3..506c8f2e7 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java @@ -3,6 +3,8 @@ package org.osgi.util.pushstream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import org.osgi.annotation.versioning.ProviderType; + /** * A Builder for a PushStream. This Builder extends the support of a standard * BufferBuilder by allowing the PushStream to be unbuffered. @@ -11,6 +13,7 @@ import java.util.concurrent.Executor; * @param <T> The type of objects in the {@link PushEvent} * @param <U> The type of the Queue used in the user specified buffer */ +@ProviderType public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>> extends BufferBuilder<PushStream<T>,T,U> { diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java index 5ec7cb336..0bf8d7127 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java @@ -77,7 +77,7 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> } @Override - public PushStream<T> create() { + public PushStream<T> build() { if (unbuffered) { return psp.createUnbufferedStream(eventSource, previousExecutor); } else { diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java index be87c6bce..f63c66210 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java @@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; import java.util.stream.Stream; /** @@ -193,7 +192,7 @@ public final class PushStreamProvider { ((ExecutorService) toUse).shutdown(); } releaseScheduler(); - }).map(Function.identity()); + }).map(x -> x); return stream; } @@ -226,7 +225,7 @@ public final class PushStreamProvider { ((ExecutorService) toUse).shutdown(); } releaseScheduler(); - }).map(Function.identity()); + }).map(x -> x); return stream; } @@ -249,7 +248,7 @@ public final class PushStreamProvider { */ public <T> PushEventSource<T> createEventSourceFromStream( PushStream<T> stream) { - return buildEventSourceFromStream(stream).create(); + return buildEventSourceFromStream(stream).build(); } /** @@ -269,7 +268,7 @@ public final class PushStreamProvider { PushStream<T> stream) { return new AbstractBufferBuilder<PushEventSource<T>,T,U>() { @Override - public PushEventSource<T> create() { + public PushEventSource<T> build() { SimplePushEventSource<T> spes = createSimplePushEventSource( concurrency, worker, buffer, bufferingPolicy, () -> { try { @@ -321,7 +320,7 @@ public final class PushStreamProvider { Class<T> type) { return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() { @Override - public SimplePushEventSource<T> create() { + public SimplePushEventSource<T> build() { return createSimplePushEventSource(concurrency, worker, buffer, bufferingPolicy, () -> { /* Nothing else to do */ }); } @@ -403,7 +402,7 @@ public final class PushStreamProvider { */ public <T> PushEventConsumer<T> createBufferedConsumer( PushEventConsumer<T> delegate) { - return buildBufferedConsumer(delegate).create(); + return buildBufferedConsumer(delegate).build(); } /** @@ -435,7 +434,7 @@ public final class PushStreamProvider { PushEventConsumer<T> delegate) { return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() { @Override - public PushEventConsumer<T> create() { + public PushEventConsumer<T> build() { PushEventPipe<T> pipe = new PushEventPipe<>(); createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure) @@ -574,7 +573,7 @@ public final class PushStreamProvider { ((ExecutorService) toUse).shutdown(); } releaseScheduler(); - }).map(Function.identity()); + }).map(x -> x); return stream; } diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java index faf9e6584..d6116e34e 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java @@ -24,25 +24,30 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T } @Override - protected boolean close(PushEvent<T> event) { - if(super.close(event)) { - ofNullable(upstream.getAndSet(() -> { - // This block doesn't need to do anything, but the presence - // of the Closable is needed to prevent duplicate begins - })).ifPresent(c -> { - try { - c.close(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); + protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) { + if (super.close(event, sendDownStreamEvent)) { + upstreamClose(event); return true; } return false; } @Override + protected void upstreamClose(PushEvent< ? > close) { + ofNullable(upstream.getAndSet(() -> { + // This block doesn't need to do anything, but the presence + // of the Closable is needed to prevent duplicate begins + })).ifPresent(c -> { + try { + c.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + } + + @Override protected boolean begin() { if(closed.compareAndSet(BUILDING, STARTED)) { AutoCloseable toClose = connector.apply(this::handleEvent); diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java index e32bf225e..7c52b5afe 100644 --- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java @@ -60,7 +60,7 @@ public class LogStreamProviderImpl implements LogStreamProvider { LogEntrySource logEntrySource = new LogEntrySource(withHistory); PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource); //creating an unbuffered stream - PushStream<LogEntry> logStream = streamBuilder.unbuffered().create(); + PushStream<LogEntry> logStream = streamBuilder.unbuffered().build(); logEntrySource.setLogStream(logStream); // Adding to sources makes the source start listening for new entries logEntrySources.add(logEntrySource); |