diff options
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java')
-rw-r--r-- | bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java | 43 |
1 files changed, 22 insertions, 21 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java index 094a580ab..478d0e4a3 100644 --- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java @@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore; import org.osgi.util.promise.Deferred; import org.osgi.util.promise.Promise; -import org.osgi.util.promise.Promises; +import org.osgi.util.promise.PromiseFactory; class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> implements SimplePushEventSource<T> { private final Object lock = new Object(); - private final PushStreamExecutors executors; - private final PushStreamExecutors sameThread; + private final PromiseFactory promiseFactory; + private final PromiseFactory sameThread; private final QueuePolicy<T,U> queuePolicy; @@ -57,13 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends private boolean waitForFinishes; - public SimplePushEventSourceImpl(PushStreamExecutors executors, + public SimplePushEventSourceImpl(PromiseFactory promiseFactory, QueuePolicy<T,U> queuePolicy, U queue, int parallelism, Runnable onClose) { - this.executors = executors; - this.sameThread = new PushStreamExecutors( - PushStreamExecutors.inlineExecutor(), - executors.scheduledExecutor()); + this.promiseFactory = promiseFactory; + this.sameThread = new PromiseFactory( + PromiseFactory.inlineExecutor(), + promiseFactory.scheduledExecutor()); this.queuePolicy = queuePolicy; this.queue = queue; this.parallelism = parallelism; @@ -111,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) { try { - executors.execute(() -> safePush(pec, event)); + promiseFactory.executor().execute(() -> safePush(pec, event)); } catch (RejectedExecutionException ree) { // TODO log? if (!event.isTerminal()) { @@ -126,7 +126,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends PushEventConsumer< ? super T> pec, PushEvent<T> event) { Deferred<Long> d = sameThread.deferred(); try { - executors.execute( + promiseFactory.executor().execute( () -> d.resolve(Long.valueOf( System.nanoTime() + safePush(pec, event)))); } catch (RejectedExecutionException ree) { @@ -206,8 +206,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends } @Override - public void error(Exception e) { - enqueueEvent(PushEvent.error(e)); + public void error(Throwable t) { + enqueueEvent(PushEvent.error(t)); } private void enqueueEvent(PushEvent<T> event) { @@ -237,7 +237,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends "unchecked", "boxing" }) private void startWorker() { - executors.execute(() -> { + promiseFactory.executor().execute(() -> { try { for(;;) { @@ -287,7 +287,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends - System.nanoTime(); if (toWait > 0) { - executors.schedule(this::startWorker, toWait, + promiseFactory.scheduledExecutor().schedule( + this::startWorker, toWait, NANOSECONDS); return; } @@ -297,14 +298,15 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends long toWait = p.getValue() - System.nanoTime(); if (toWait > 0) { - executors.schedule(this::startWorker, toWait, + promiseFactory.scheduledExecutor().schedule( + this::startWorker, toWait, NANOSECONDS); } else { startWorker(); } return p; }, p -> close( - PushEvent.error((Exception) p.getFailure()))); + PushEvent.error(p.getFailure()))); return; } } @@ -346,8 +348,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends return doCall(event, pec); } }).collect(toList()); - return Promises - .all(sameThread.deferred(), calls) + return sameThread.all(calls) .map(l -> l.stream().max(Long::compareTo).orElseGet( () -> Long.valueOf(System.nanoTime()))); } @@ -375,17 +376,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends if (connected.isEmpty()) { if (connectPromise == null) { - connectPromise = executors.deferred(); + connectPromise = promiseFactory.deferred(); } return connectPromise.getPromise(); } else { - return executors.resolved(null); + return promiseFactory.resolved(null); } } } private Promise<Void> closedConnectPromise() { - return executors.failed(new IllegalStateException( + return promiseFactory.failed(new IllegalStateException( "This SimplePushEventSource is closed")); } |