diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java')
-rw-r--r-- | bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java | 68 |
1 files changed, 55 insertions, 13 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java index 2293c1aad..d64ac129d 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java @@ -37,14 +37,14 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.IntFunction; import java.util.function.IntSupplier; -import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; +import org.osgi.util.function.Function; +import org.osgi.util.function.Predicate; import org.osgi.util.promise.Deferred; import org.osgi.util.promise.Promise; import org.osgi.util.promise.TimeoutException; @@ -52,6 +52,8 @@ import org.osgi.util.pushstream.PushEvent.EventType; abstract class AbstractPushStreamImpl<T> implements PushStream<T> { + private final Function<T,T> IDENTITY = x -> x; + static enum State { BUILDING, STARTED, CLOSED } @@ -70,6 +72,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { protected abstract boolean begin(); + protected abstract void upstreamClose(PushEvent< ? > close); + AbstractPushStreamImpl(PushStreamProvider psp, Executor executor, ScheduledExecutorService scheduler) { this.psp = psp; @@ -107,16 +111,23 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public void close() { - close(PushEvent.close()); + PushEvent<T> close = PushEvent.close(); + if (close(close, true)) { + upstreamClose(close); + } } protected boolean close(PushEvent<T> event) { + return close(event, true); + } + + protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) { if(!event.isTerminal()) { throw new IllegalArgumentException("The event " + event + " is not a close event."); } if(closed.getAndSet(CLOSED) != CLOSED) { PushEventConsumer<T> aec = next.getAndSet(null); - if(aec != null) { + if (sendDownStreamEvent && aec != null) { try { aec.accept(event); } catch (Exception e) { @@ -411,7 +422,10 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { scheduler.schedule(() -> check(lastTime, timeout), timeout - elapsed, NANOSECONDS); } else { - close(PushEvent.error(new TimeoutException())); + PushEvent<T> error = PushEvent.error(new TimeoutException()); + close(error); + // Upstream close is needed as we have no direct backpressure + upstreamClose(error); } } @@ -461,10 +475,18 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { ex.execute(() -> { try { if (eventStream.handleEvent(event) < 0) { - eventStream.close(PushEvent.close()); + PushEvent<T> close = PushEvent.close(); + eventStream.close(close); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(close); } } catch (Exception e1) { - close(PushEvent.error(e1)); + PushEvent<T> error = PushEvent.error(e1); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); } finally { s.release(1); } @@ -541,7 +563,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { // TODO Auto-generated catch block e.printStackTrace(); } - }).map(Function.identity()); + }).map(IDENTITY); } @Override @@ -591,6 +613,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { } return false; } + + @Override + protected void upstreamClose(PushEvent< ? > close) { + AbstractPushStreamImpl.this.upstreamClose(close); + source.close(); + } }; @@ -607,7 +635,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { // TODO Auto-generated catch block e.printStackTrace(); } - }).map(Function.identity()); + }).map(IDENTITY); } @SuppressWarnings("unchecked") @@ -803,7 +831,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { private <R> long aggregateAndForward(Function<Collection<T>,R> f, AbstractPushStreamImpl<R> eventStream, - PushEvent< ? extends T> event, Queue<T> queue) { + PushEvent< ? extends T> event, Queue<T> queue) throws Exception { if (!queue.offer(event.getData())) { ((ArrayQueue<T>) queue).forcePush(event.getData()); } @@ -820,7 +848,13 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { @Override public <R> PushStream<R> window(Duration time, Executor executor, Function<Collection<T>,R> f) { - return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c)); + return window(() -> time, () -> 0, executor, (t, c) -> { + try { + return f.apply(c); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Override @@ -946,7 +980,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { .toMillis(elapsed)), collected))); } catch (Exception e) { - close(PushEvent.error(e)); + PushEvent<T> error = PushEvent.error(e); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); } }); } @@ -1126,7 +1164,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> { Long.valueOf(NANOSECONDS.toMillis(elapsed)), collected))); } catch (Exception e) { - close(PushEvent.error(e)); + PushEvent<T> error = PushEvent.error(e); + close(error); + // Upstream close is needed as we have no direct + // backpressure + upstreamClose(error); } }); } |