Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java68
1 files changed, 55 insertions, 13 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index 2293c1aad..d64ac129d 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -37,14 +37,14 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
+import org.osgi.util.function.Function;
+import org.osgi.util.function.Predicate;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.TimeoutException;
@@ -52,6 +52,8 @@ import org.osgi.util.pushstream.PushEvent.EventType;
abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+ private final Function<T,T> IDENTITY = x -> x;
+
static enum State {
BUILDING, STARTED, CLOSED
}
@@ -70,6 +72,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected abstract boolean begin();
+ protected abstract void upstreamClose(PushEvent< ? > close);
+
AbstractPushStreamImpl(PushStreamProvider psp,
Executor executor, ScheduledExecutorService scheduler) {
this.psp = psp;
@@ -107,16 +111,23 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public void close() {
- close(PushEvent.close());
+ PushEvent<T> close = PushEvent.close();
+ if (close(close, true)) {
+ upstreamClose(close);
+ }
}
protected boolean close(PushEvent<T> event) {
+ return close(event, true);
+ }
+
+ protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) {
if(!event.isTerminal()) {
throw new IllegalArgumentException("The event " + event + " is not a close event.");
}
if(closed.getAndSet(CLOSED) != CLOSED) {
PushEventConsumer<T> aec = next.getAndSet(null);
- if(aec != null) {
+ if (sendDownStreamEvent && aec != null) {
try {
aec.accept(event);
} catch (Exception e) {
@@ -411,7 +422,10 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
scheduler.schedule(() -> check(lastTime, timeout),
timeout - elapsed, NANOSECONDS);
} else {
- close(PushEvent.error(new TimeoutException()));
+ PushEvent<T> error = PushEvent.error(new TimeoutException());
+ close(error);
+ // Upstream close is needed as we have no direct backpressure
+ upstreamClose(error);
}
}
@@ -461,10 +475,18 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
ex.execute(() -> {
try {
if (eventStream.handleEvent(event) < 0) {
- eventStream.close(PushEvent.close());
+ PushEvent<T> close = PushEvent.close();
+ eventStream.close(close);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(close);
}
} catch (Exception e1) {
- close(PushEvent.error(e1));
+ PushEvent<T> error = PushEvent.error(e1);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
} finally {
s.release(1);
}
@@ -541,7 +563,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
// TODO Auto-generated catch block
e.printStackTrace();
}
- }).map(Function.identity());
+ }).map(IDENTITY);
}
@Override
@@ -591,6 +613,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
}
return false;
}
+
+ @Override
+ protected void upstreamClose(PushEvent< ? > close) {
+ AbstractPushStreamImpl.this.upstreamClose(close);
+ source.close();
+ }
};
@@ -607,7 +635,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
// TODO Auto-generated catch block
e.printStackTrace();
}
- }).map(Function.identity());
+ }).map(IDENTITY);
}
@SuppressWarnings("unchecked")
@@ -803,7 +831,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
private <R> long aggregateAndForward(Function<Collection<T>,R> f,
AbstractPushStreamImpl<R> eventStream,
- PushEvent< ? extends T> event, Queue<T> queue) {
+ PushEvent< ? extends T> event, Queue<T> queue) throws Exception {
if (!queue.offer(event.getData())) {
((ArrayQueue<T>) queue).forcePush(event.getData());
}
@@ -820,7 +848,13 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <R> PushStream<R> window(Duration time, Executor executor,
Function<Collection<T>,R> f) {
- return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
+ return window(() -> time, () -> 0, executor, (t, c) -> {
+ try {
+ return f.apply(c);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
@Override
@@ -946,7 +980,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
.toMillis(elapsed)),
collected)));
} catch (Exception e) {
- close(PushEvent.error(e));
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
}
});
}
@@ -1126,7 +1164,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
Long.valueOf(NANOSECONDS.toMillis(elapsed)),
collected)));
} catch (Exception e) {
- close(PushEvent.error(e));
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
}
});
}

Back to the top