Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java43
1 files changed, 22 insertions, 21 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
index 094a580ab..478d0e4a3 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.Promises;
+import org.osgi.util.promise.PromiseFactory;
class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
implements SimplePushEventSource<T> {
private final Object lock = new Object();
- private final PushStreamExecutors executors;
- private final PushStreamExecutors sameThread;
+ private final PromiseFactory promiseFactory;
+ private final PromiseFactory sameThread;
private final QueuePolicy<T,U> queuePolicy;
@@ -57,13 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private boolean waitForFinishes;
- public SimplePushEventSourceImpl(PushStreamExecutors executors,
+ public SimplePushEventSourceImpl(PromiseFactory promiseFactory,
QueuePolicy<T,U> queuePolicy,
U queue, int parallelism, Runnable onClose) {
- this.executors = executors;
- this.sameThread = new PushStreamExecutors(
- PushStreamExecutors.inlineExecutor(),
- executors.scheduledExecutor());
+ this.promiseFactory = promiseFactory;
+ this.sameThread = new PromiseFactory(
+ PromiseFactory.inlineExecutor(),
+ promiseFactory.scheduledExecutor());
this.queuePolicy = queuePolicy;
this.queue = queue;
this.parallelism = parallelism;
@@ -111,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
try {
- executors.execute(() -> safePush(pec, event));
+ promiseFactory.executor().execute(() -> safePush(pec, event));
} catch (RejectedExecutionException ree) {
// TODO log?
if (!event.isTerminal()) {
@@ -126,7 +126,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
PushEventConsumer< ? super T> pec, PushEvent<T> event) {
Deferred<Long> d = sameThread.deferred();
try {
- executors.execute(
+ promiseFactory.executor().execute(
() -> d.resolve(Long.valueOf(
System.nanoTime() + safePush(pec, event))));
} catch (RejectedExecutionException ree) {
@@ -206,8 +206,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
}
@Override
- public void error(Exception e) {
- enqueueEvent(PushEvent.error(e));
+ public void error(Throwable t) {
+ enqueueEvent(PushEvent.error(t));
}
private void enqueueEvent(PushEvent<T> event) {
@@ -237,7 +237,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
"unchecked", "boxing"
})
private void startWorker() {
- executors.execute(() -> {
+ promiseFactory.executor().execute(() -> {
try {
for(;;) {
@@ -287,7 +287,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
- System.nanoTime();
if (toWait > 0) {
- executors.schedule(this::startWorker, toWait,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, toWait,
NANOSECONDS);
return;
}
@@ -297,14 +298,15 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
long toWait = p.getValue() - System.nanoTime();
if (toWait > 0) {
- executors.schedule(this::startWorker, toWait,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, toWait,
NANOSECONDS);
} else {
startWorker();
}
return p;
}, p -> close(
- PushEvent.error((Exception) p.getFailure())));
+ PushEvent.error(p.getFailure())));
return;
}
}
@@ -346,8 +348,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
return doCall(event, pec);
}
}).collect(toList());
- return Promises
- .all(sameThread.deferred(), calls)
+ return sameThread.all(calls)
.map(l -> l.stream().max(Long::compareTo).orElseGet(
() -> Long.valueOf(System.nanoTime())));
}
@@ -375,17 +376,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
if (connected.isEmpty()) {
if (connectPromise == null) {
- connectPromise = executors.deferred();
+ connectPromise = promiseFactory.deferred();
}
return connectPromise.getPromise();
} else {
- return executors.resolved(null);
+ return promiseFactory.resolved(null);
}
}
}
private Promise<Void> closedConnectPromise() {
- return executors.failed(new IllegalStateException(
+ return promiseFactory.failed(new IllegalStateException(
"This SimplePushEventSource is closed"));
}

Back to the top