Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java133
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java11
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java6
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java14
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java20
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java51
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java32
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java4
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java43
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java6
10 files changed, 180 insertions, 140 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index ef8bb142..cf16c212 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@@ -64,6 +65,7 @@ import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.promise.TimeoutException;
import org.osgi.util.pushstream.PushEvent.EventType;
@@ -77,7 +79,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected final PushStreamProvider psp;
- protected final PushStreamExecutors executors;
+ protected final PromiseFactory promiseFactory;
protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
@@ -91,9 +93,9 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected abstract void upstreamClose(PushEvent< ? > close);
AbstractPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors) {
+ PromiseFactory promiseFactory) {
this.psp = psp;
- this.executors = executors;
+ this.promiseFactory = promiseFactory;
}
protected long handleEvent(PushEvent< ? extends T> event) {
@@ -216,7 +218,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> filter(Predicate< ? super T> predicate) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext((event) -> {
try {
if (!event.isTerminal()) {
@@ -239,7 +241,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
if (!event.isTerminal()) {
@@ -257,10 +259,62 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
}
@Override
+ public <R> PushStream<R> asyncMap(int n, int delay,
+ Function< ? super T,Promise< ? extends R>> mapper)
+ throws IllegalArgumentException, NullPointerException {
+
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+ psp, promiseFactory, this);
+ Semaphore s = new Semaphore(n);
+ updateNext(event -> {
+ try {
+ if (event.isTerminal()) {
+ s.acquire(n);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+
+ s.acquire(1);
+
+ Promise< ? extends R> p = mapper.apply(event.getData());
+ p.thenAccept(d -> promiseFactory.executor().execute(() -> {
+ try {
+ if (eventStream
+ .handleEvent(PushEvent.data(d)) < 0) {
+ PushEvent<R> close = PushEvent.close();
+ eventStream.close(close);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(close);
+ }
+ } finally {
+ s.release();
+ }
+ })).onFailure(t -> promiseFactory.executor().execute(() -> {
+ PushEvent<T> error = PushEvent.error(t);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
+ }));
+
+ // The number active before was one less than the active number
+ int activePromises = Math.max(0, n - s.availablePermits() - 1);
+ return (activePromises + s.getQueueLength()) * delay;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+
+ return eventStream;
+ }
+
+ @Override
public <R> PushStream<R> flatMap(
Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
PushEventConsumer<R> consumer = e -> {
switch (e.getType()) {
@@ -320,7 +374,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> sorted(Comparator< ? super T> comparator) {
List<T> list = Collections.synchronizedList(new ArrayList<>());
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
switch(event.getType()) {
@@ -355,7 +409,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
throw new IllegalArgumentException("The limit must be greater than zero");
}
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
AtomicLong counter = new AtomicLong(maxSize);
updateNext(event -> {
try {
@@ -381,11 +435,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> limit(Duration maxTime) {
- Runnable start = () -> executors.schedule(() -> close(),
+ Runnable start = () -> promiseFactory.scheduledExecutor().schedule(
+ () -> close(),
maxTime.toNanos(), NANOSECONDS);
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, executors, this) {
+ psp, promiseFactory, this) {
@Override
protected void beginning() {
start.run();
@@ -409,11 +464,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long timeout = maxTime.toNanos();
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, executors, this) {
+ psp, promiseFactory, this) {
@Override
protected void beginning() {
lastTime.set(System.nanoTime());
- executors.schedule(() -> check(lastTime, timeout), timeout,
+ promiseFactory.scheduledExecutor().schedule(
+ () -> check(lastTime, timeout), timeout,
NANOSECONDS);
}
};
@@ -434,7 +490,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long elapsed = now - lastTime.get();
if (elapsed < timeout) {
- executors.schedule(() -> check(lastTime, timeout),
+ promiseFactory.scheduledExecutor().schedule(
+ () -> check(lastTime, timeout),
timeout - elapsed, NANOSECONDS);
} else {
PushEvent<T> error = PushEvent.error(new TimeoutException());
@@ -451,7 +508,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
"The number to skip must be greater than or equal to zero");
}
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
AtomicLong counter = new AtomicLong(n);
updateNext(event -> {
try {
@@ -475,7 +532,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> fork(int n, int delay, Executor ex) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+ psp, new PromiseFactory(Objects.requireNonNull(ex),
+ promiseFactory.scheduledExecutor()),
this);
Semaphore s = new Semaphore(n);
updateNext(event -> {
@@ -538,7 +596,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> merge(
PushEventSource< ? extends T> source) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
AtomicInteger count = new AtomicInteger(2);
PushEventConsumer<T> consumer = event -> {
try {
@@ -620,7 +678,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
- psp, executors) {
+ psp, promiseFactory) {
@Override
protected boolean begin() {
if (closed.compareAndSet(BUILDING, STARTED)) {
@@ -660,7 +718,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
for(int i = 0; i < tests.length; i++) {
- rsult[i] = new IntermediatePushStreamImpl<>(psp, executors, this);
+ rsult[i] = new IntermediatePushStreamImpl<>(psp, promiseFactory, this);
}
Boolean[] array = new Boolean[tests.length];
@@ -716,7 +774,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> sequential() {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
Lock lock = new ReentrantLock();
updateNext((event) -> {
try {
@@ -738,7 +796,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> coalesce(
Function< ? super T,Optional<R>> accumulator) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext((event) -> {
try {
if (!event.isTerminal()) {
@@ -785,7 +843,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, executors, this) {
+ psp, promiseFactory, this) {
@Override
protected void beginning() {
init.run();
@@ -857,7 +915,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <R> PushStream<R> window(Duration time,
Function<Collection<T>,R> f) {
- return window(time, executors.executor(), f);
+ return window(time, promiseFactory.executor(), f);
}
@Override
@@ -876,7 +934,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> window(Supplier<Duration> time,
IntSupplier maxEvents,
BiFunction<Long,Collection<T>,R> f) {
- return window(time, maxEvents, executors.executor(), f);
+ return window(time, maxEvents, promiseFactory.executor(), f);
}
@Override
@@ -902,7 +960,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long windowSize = time.get().toNanos();
previousWindowSize.set(windowSize);
- executors.schedule(
+ promiseFactory.scheduledExecutor().schedule(
getWindowTask(p, f, time, maxEvents, lock, count,
queueRef, timestamp, counter,
previousWindowSize, ex),
@@ -914,7 +972,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+ psp, new PromiseFactory(Objects.requireNonNull(ex),
+ promiseFactory.scheduledExecutor()),
this) {
@Override
protected void beginning() {
@@ -968,7 +1027,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long nextWindow = time.get().toNanos();
long backpressure = previousWindowSize.getAndSet(nextWindow)
- elapsed;
- executors.schedule(
+ promiseFactory.scheduledExecutor().schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
newCount, queueRef, timestamp, counter,
previousWindowSize, ex),
@@ -1194,7 +1253,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long nextWindow = time.get().toNanos();
previousWindowSize.set(nextWindow);
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
- executors.schedule(
+ promiseFactory.scheduledExecutor().schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
expectedCounter + 1, queueRef, timestamp, counter,
previousWindowSize, executor),
@@ -1226,7 +1285,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
long bp = eventStream.handleEvent(event);
@@ -1247,7 +1306,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> adjustBackPressure(
ToLongBiFunction<T,Long> adjustment) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
long bp = eventStream.handleEvent(event);
@@ -1268,7 +1327,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Void> forEach(Consumer< ? super T> action) {
- Deferred<Void> d = executors.deferred();
+ Deferred<Void> d = promiseFactory.deferred();
updateNext((event) -> {
try {
switch(event.getType()) {
@@ -1307,7 +1366,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
- Deferred<T> d = executors.deferred();
+ Deferred<T> d = promiseFactory.deferred();
AtomicReference<T> iden = new AtomicReference<T>(identity);
updateNext(event -> {
@@ -1336,7 +1395,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
- Deferred<Optional<T>> d = executors.deferred();
+ Deferred<Optional<T>> d = promiseFactory.deferred();
AtomicReference<T> iden = new AtomicReference<T>(null);
updateNext(event -> {
@@ -1366,7 +1425,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
- Deferred<U> d = executors.deferred();
+ Deferred<U> d = promiseFactory.deferred();
AtomicReference<U> iden = new AtomicReference<>(identity);
updateNext(event -> {
@@ -1397,7 +1456,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
A result = collector.supplier().get();
BiConsumer<A, ? super T> accumulator = collector.accumulator();
- Deferred<R> d = executors.deferred();
+ Deferred<R> d = promiseFactory.deferred();
PushEventConsumer<T> consumer;
if (collector.characteristics()
@@ -1464,7 +1523,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Long> count() {
- Deferred<Long> d = executors.deferred();
+ Deferred<Long> d = promiseFactory.deferred();
LongAdder counter = new LongAdder();
updateNext((event) -> {
try {
@@ -1510,7 +1569,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Optional<T>> findFirst() {
- Deferred<Optional<T>> d = executors.deferred();
+ Deferred<Optional<T>> d = promiseFactory.deferred();
updateNext((event) -> {
try {
Optional<T> o = null;
@@ -1544,7 +1603,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
- Deferred<Long> d = executors.deferred();
+ Deferred<Long> d = promiseFactory.deferred();
LongAdder la = new LongAdder();
updateNext((event) -> {
try {
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
index 7a8d1636..dfb9eeea 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import org.osgi.util.promise.PromiseFactory;
+
class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
@@ -46,11 +48,11 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final int parallelism;
BufferedPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors, U eventQueue, int parallelism,
+ PromiseFactory promiseFactory, U eventQueue, int parallelism,
QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executors, connector);
+ super(psp, promiseFactory, connector);
this.eventQueue = eventQueue;
this.parallelism = parallelism;
this.semaphore = new Semaphore(parallelism);
@@ -85,7 +87,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
}
private void startWorker() {
- executors.execute(() -> {
+ promiseFactory.executor().execute(() -> {
try {
PushEvent< ? extends T> event;
while ((event = eventQueue.poll()) != null) {
@@ -99,7 +101,8 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
close();
return;
} else if(backpressure > 0) {
- executors.schedule(this::startWorker, backpressure,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, backpressure,
MILLISECONDS);
return;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
index 67875121..c939c4a6 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -18,15 +18,17 @@ package org.osgi.util.pushstream;
import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+import org.osgi.util.promise.PromiseFactory;
+
class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
implements PushStream<T> {
private final AbstractPushStreamImpl< ? > previous;
IntermediatePushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors,
+ PromiseFactory promiseFactory,
AbstractPushStreamImpl< ? > previous) {
- super(psp, executors);
+ super(psp, promiseFactory);
this.previous = previous;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
index 57a3bb99..574d6559 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
@@ -90,7 +90,7 @@ public abstract class PushEvent<T> {
* @throws IllegalStateException if this event is not an
* {@link EventType#ERROR} event.
*/
- public Exception getFailure() throws IllegalStateException {
+ public Throwable getFailure() throws IllegalStateException {
throw new IllegalStateException(
"Not an ERROR event, the event type is " + getType());
}
@@ -119,11 +119,11 @@ public abstract class PushEvent<T> {
* Create a new error event.
*
* @param <T> The payload type.
- * @param e The error.
+ * @param t The error.
* @return A new error event with the specified error.
*/
- public static <T> PushEvent<T> error(Exception e) {
- return new ErrorEvent<T>(e);
+ public static <T> PushEvent<T> error(Throwable t) {
+ return new ErrorEvent<T>(t);
}
/**
@@ -182,14 +182,14 @@ public abstract class PushEvent<T> {
}
static final class ErrorEvent<T> extends PushEvent<T> {
- private final Exception error;
+ private final Throwable error;
- ErrorEvent(Exception error) {
+ ErrorEvent(Throwable error) {
this.error = error;
}
@Override
- public Exception getFailure() {
+ public Throwable getFailure() {
return error;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
index 6d5953e3..071c9ec7 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
@@ -104,6 +104,26 @@ public interface PushStream<T> extends AutoCloseable {
<R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
/**
+ * Asynchronously map the payload values. The mapping function returns a
+ * Promise representing the asynchronous mapping operation.
+ * <p>
+ * The PushStream limits the number of concurrently running mapping
+ * operations, and returns back pressure based on the number of existing
+ * queued operations.
+ *
+ * @param n number of simultaneous promises to use
+ * @param delay Nr of ms/promise that is queued back pressure
+ * @param mapper The mapping function
+ * @return Builder style (can be a new or the same object)
+ * @throws IllegalArgumentException if the number of threads is &lt; 1 or
+ * the delay is &lt; 0
+ * @throws NullPointerException if the mapper is null
+ */
+ <R> PushStream<R> asyncMap(int n, int delay,
+ Function< ? super T,Promise< ? extends R>> mapper)
+ throws IllegalArgumentException, NullPointerException;
+
+ /**
* Flat map the payload value (turn one event into 0..n events of
* potentially another type).
*
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
deleted file mode 100644
index 7c2509ee..00000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.osgi.util.promise.PromiseExecutors;
-
-class PushStreamExecutors extends PromiseExecutors {
- PushStreamExecutors(Executor executor, ScheduledExecutorService scheduler) {
- super(requireNonNull(executor), requireNonNull(scheduler));
- }
-
- void execute(Runnable operation) {
- executor().execute(operation);
- }
-
- ScheduledFuture< ? > schedule(Runnable operation, long delay,
- TimeUnit unit) {
- return scheduledExecutor().schedule(operation, delay, unit);
- }
-
- @Override
- protected Executor executor() {
- return super.executor();
- }
-
- @Override
- protected ScheduledExecutorService scheduledExecutor() {
- return super.scheduledExecutor();
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
index c7d861bf..ecd8bf4c 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -23,6 +23,7 @@ import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
import java.util.Iterator;
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +37,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
+import org.osgi.util.promise.PromiseFactory;
+
/**
* A factory for {@link PushStream} instances, and utility methods for handling
* {@link PushEventSource}s and {@link PushEventConsumer}s
@@ -164,7 +167,7 @@ public final class PushStreamProvider {
workerToUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
- workerToUse = executor;
+ workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -174,7 +177,7 @@ public final class PushStreamProvider {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
- timerToUse = scheduler;
+ timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
@@ -191,7 +194,7 @@ public final class PushStreamProvider {
}
PushStream<T> stream = new BufferedPushStreamImpl<>(this,
- new PushStreamExecutors(workerToUse, timerToUse), queue,
+ new PromiseFactory(workerToUse, timerToUse), queue,
parallelism, queuePolicy,
pushbackPolicy, aec -> {
try {
@@ -231,7 +234,7 @@ public final class PushStreamProvider {
workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
- workerToUse = executor;
+ workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -241,11 +244,11 @@ public final class PushStreamProvider {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
- timerToUse = scheduler;
+ timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
PushStream<T> stream = new UnbufferedPushStreamImpl<>(this,
- new PushStreamExecutors(workerToUse, timerToUse),
+ new PromiseFactory(workerToUse, timerToUse),
aec -> {
try {
return eventSource.open(aec);
@@ -533,7 +536,7 @@ public final class PushStreamProvider {
toUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
- toUse = executor;
+ toUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -546,7 +549,7 @@ public final class PushStreamProvider {
}
SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
- new PushStreamExecutors(toUse, acquireScheduler()), queuePolicy,
+ new PromiseFactory(toUse, acquireScheduler()), queuePolicy,
queue, parallelism,
() -> {
try {
@@ -712,7 +715,7 @@ public final class PushStreamProvider {
workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
- workerToUse = executor;
+ workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -722,12 +725,12 @@ public final class PushStreamProvider {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
- timerToUse = scheduler;
+ timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
- this, new PushStreamExecutors(workerToUse, timerToUse), aec -> {
+ this, new PromiseFactory(workerToUse, timerToUse), aec -> {
return () -> { /* No action to take */ };
}) {
@@ -736,7 +739,7 @@ public final class PushStreamProvider {
if (super.begin()) {
Iterator<T> it = items.iterator();
- executors.execute(() -> pushData(it));
+ promiseFactory.executor().execute(() -> pushData(it));
return true;
}
@@ -753,8 +756,9 @@ public final class PushStreamProvider {
close();
return;
} else {
- executors.schedule(
- () -> executors
+ promiseFactory.scheduledExecutor()
+ .schedule(
+ () -> promiseFactory.executor()
.execute(() -> pushData(it)),
returnValue, MILLISECONDS);
return;
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
index f47bcb1e..314ae083 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
@@ -76,9 +76,9 @@ public interface SimplePushEventSource<T>
* {@link #open(PushEventConsumer)} this source, and will receive subsequent
* events.
*
- * @param e the error
+ * @param t the error
*/
- void error(Exception e);
+ void error(Throwable t);
/**
* Determine whether there are any {@link PushEventConsumer}s for this
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
index 094a580a..478d0e4a 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.Promises;
+import org.osgi.util.promise.PromiseFactory;
class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
implements SimplePushEventSource<T> {
private final Object lock = new Object();
- private final PushStreamExecutors executors;
- private final PushStreamExecutors sameThread;
+ private final PromiseFactory promiseFactory;
+ private final PromiseFactory sameThread;
private final QueuePolicy<T,U> queuePolicy;
@@ -57,13 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private boolean waitForFinishes;
- public SimplePushEventSourceImpl(PushStreamExecutors executors,
+ public SimplePushEventSourceImpl(PromiseFactory promiseFactory,
QueuePolicy<T,U> queuePolicy,
U queue, int parallelism, Runnable onClose) {
- this.executors = executors;
- this.sameThread = new PushStreamExecutors(
- PushStreamExecutors.inlineExecutor(),
- executors.scheduledExecutor());
+ this.promiseFactory = promiseFactory;
+ this.sameThread = new PromiseFactory(
+ PromiseFactory.inlineExecutor(),
+ promiseFactory.scheduledExecutor());
this.queuePolicy = queuePolicy;
this.queue = queue;
this.parallelism = parallelism;
@@ -111,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
try {
- executors.execute(() -> safePush(pec, event));
+ promiseFactory.executor().execute(() -> safePush(pec, event));
} catch (RejectedExecutionException ree) {
// TODO log?
if (!event.isTerminal()) {
@@ -126,7 +126,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
PushEventConsumer< ? super T> pec, PushEvent<T> event) {
Deferred<Long> d = sameThread.deferred();
try {
- executors.execute(
+ promiseFactory.executor().execute(
() -> d.resolve(Long.valueOf(
System.nanoTime() + safePush(pec, event))));
} catch (RejectedExecutionException ree) {
@@ -206,8 +206,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
}
@Override
- public void error(Exception e) {
- enqueueEvent(PushEvent.error(e));
+ public void error(Throwable t) {
+ enqueueEvent(PushEvent.error(t));
}
private void enqueueEvent(PushEvent<T> event) {
@@ -237,7 +237,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
"unchecked", "boxing"
})
private void startWorker() {
- executors.execute(() -> {
+ promiseFactory.executor().execute(() -> {
try {
for(;;) {
@@ -287,7 +287,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
- System.nanoTime();
if (toWait > 0) {
- executors.schedule(this::startWorker, toWait,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, toWait,
NANOSECONDS);
return;
}
@@ -297,14 +298,15 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
long toWait = p.getValue() - System.nanoTime();
if (toWait > 0) {
- executors.schedule(this::startWorker, toWait,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, toWait,
NANOSECONDS);
} else {
startWorker();
}
return p;
}, p -> close(
- PushEvent.error((Exception) p.getFailure())));
+ PushEvent.error(p.getFailure())));
return;
}
}
@@ -346,8 +348,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
return doCall(event, pec);
}
}).collect(toList());
- return Promises
- .all(sameThread.deferred(), calls)
+ return sameThread.all(calls)
.map(l -> l.stream().max(Long::compareTo).orElseGet(
() -> Long.valueOf(System.nanoTime())));
}
@@ -375,17 +376,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
if (connected.isEmpty()) {
if (connectPromise == null) {
- connectPromise = executors.deferred();
+ connectPromise = promiseFactory.deferred();
}
return connectPromise.getPromise();
} else {
- return executors.resolved(null);
+ return promiseFactory.resolved(null);
}
}
}
private Promise<Void> closedConnectPromise() {
- return executors.failed(new IllegalStateException(
+ return promiseFactory.failed(new IllegalStateException(
"This SimplePushEventSource is closed"));
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
index 53400c4c..eb3e9335 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -23,6 +23,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.osgi.util.promise.PromiseFactory;
+
class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends AbstractPushStreamImpl<T> implements PushStream<T> {
@@ -31,9 +33,9 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T
protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>();
UnbufferedPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors,
+ PromiseFactory promiseFactory,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executors);
+ super(psp, promiseFactory);
this.connector = connector;
}

Back to the top