Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java39
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java68
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java5
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java8
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java3
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java22
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java3
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java2
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java17
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java31
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java2
11 files changed, 139 insertions, 61 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
index 99f5645c9..f48266bc6 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2016, 2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,10 +19,11 @@ package org.osgi.service.log.stream;
import org.osgi.annotation.versioning.ProviderType;
import org.osgi.service.log.LogEntry;
import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.QueuePolicyOption;
/**
- * LogStreamProvider service for creating a PushStream of {@link LogEntry}
- * objects.
+ * LogStreamProvider service for creating a {@link PushStream} of
+ * {@link LogEntry} objects.
*
* @ThreadSafe
* @author $Id$
@@ -30,34 +31,42 @@ import org.osgi.util.pushstream.PushStream;
@ProviderType
public interface LogStreamProvider {
/**
- * Creation options for the PushStream of {@link LogEntry} objects.
+ * Creation options for the {@link PushStream} of {@link LogEntry} objects.
*/
enum Options {
/**
* Include history.
* <p>
- * Prime the created PushStream with the past {@link LogEntry} objects.
- * The number of past {@link LogEntry} objects is implementation
- * specific.
+ * Prime the created PushStream with the available historical
+ * {@link LogEntry} objects. The number of available {@link LogEntry}
+ * objects is implementation specific.
* <p>
- * The created PushStream will supply the past {@link LogEntry} objects
- * followed by newly created {@link LogEntry} objects.
+ * The created PushStream will supply the available historical
+ * {@link LogEntry} objects followed by newly created {@link LogEntry}
+ * objects.
*/
HISTORY;
}
/**
- * Create a PushStream of {@link LogEntry} objects.
+ * Create a {@link PushStream} of {@link LogEntry} objects.
* <p>
- * The returned PushStream is an unbuffered stream with a parallelism of
- * one.
+ * The returned PushStream must:
+ * <ul>
+ * <li>Be buffered with a buffer large enough to contain the history, if
+ * included.</li>
+ * <li>Have the {@link QueuePolicyOption#DISCARD_OLDEST} queue policy
+ * option.</li>
+ * <li>Use a shared executor.</li>
+ * <li>Have a parallelism of one.</li>
+ * </ul>
* <p>
* When this LogStreamProvider service is released by the obtaining bundle,
- * this LogStreamProvider service must call {@code close()} on the returned
- * PushStream object if it has not already been closed.
+ * this LogStreamProvider service must call {@link PushStream#close()} on
+ * the returned PushStream object if it has not already been closed.
*
* @param options The options to use when creating the PushStream.
- * @return A PushStream of {@link LogEntry} objects.
+ * @return A {@link PushStream} of {@link LogEntry} objects.
*/
PushStream<LogEntry> createStream(Options... options);
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index 2293c1aad..d64ac129d 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -37,14 +37,14 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
+import org.osgi.util.function.Function;
+import org.osgi.util.function.Predicate;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.TimeoutException;
@@ -52,6 +52,8 @@ import org.osgi.util.pushstream.PushEvent.EventType;
abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+ private final Function<T,T> IDENTITY = x -> x;
+
static enum State {
BUILDING, STARTED, CLOSED
}
@@ -70,6 +72,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected abstract boolean begin();
+ protected abstract void upstreamClose(PushEvent< ? > close);
+
AbstractPushStreamImpl(PushStreamProvider psp,
Executor executor, ScheduledExecutorService scheduler) {
this.psp = psp;
@@ -107,16 +111,23 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public void close() {
- close(PushEvent.close());
+ PushEvent<T> close = PushEvent.close();
+ if (close(close, true)) {
+ upstreamClose(close);
+ }
}
protected boolean close(PushEvent<T> event) {
+ return close(event, true);
+ }
+
+ protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) {
if(!event.isTerminal()) {
throw new IllegalArgumentException("The event " + event + " is not a close event.");
}
if(closed.getAndSet(CLOSED) != CLOSED) {
PushEventConsumer<T> aec = next.getAndSet(null);
- if(aec != null) {
+ if (sendDownStreamEvent && aec != null) {
try {
aec.accept(event);
} catch (Exception e) {
@@ -411,7 +422,10 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
scheduler.schedule(() -> check(lastTime, timeout),
timeout - elapsed, NANOSECONDS);
} else {
- close(PushEvent.error(new TimeoutException()));
+ PushEvent<T> error = PushEvent.error(new TimeoutException());
+ close(error);
+ // Upstream close is needed as we have no direct backpressure
+ upstreamClose(error);
}
}
@@ -461,10 +475,18 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
ex.execute(() -> {
try {
if (eventStream.handleEvent(event) < 0) {
- eventStream.close(PushEvent.close());
+ PushEvent<T> close = PushEvent.close();
+ eventStream.close(close);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(close);
}
} catch (Exception e1) {
- close(PushEvent.error(e1));
+ PushEvent<T> error = PushEvent.error(e1);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
} finally {
s.release(1);
}
@@ -541,7 +563,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
// TODO Auto-generated catch block
e.printStackTrace();
}
- }).map(Function.identity());
+ }).map(IDENTITY);
}
@Override
@@ -591,6 +613,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
}
return false;
}
+
+ @Override
+ protected void upstreamClose(PushEvent< ? > close) {
+ AbstractPushStreamImpl.this.upstreamClose(close);
+ source.close();
+ }
};
@@ -607,7 +635,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
// TODO Auto-generated catch block
e.printStackTrace();
}
- }).map(Function.identity());
+ }).map(IDENTITY);
}
@SuppressWarnings("unchecked")
@@ -803,7 +831,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
private <R> long aggregateAndForward(Function<Collection<T>,R> f,
AbstractPushStreamImpl<R> eventStream,
- PushEvent< ? extends T> event, Queue<T> queue) {
+ PushEvent< ? extends T> event, Queue<T> queue) throws Exception {
if (!queue.offer(event.getData())) {
((ArrayQueue<T>) queue).forcePush(event.getData());
}
@@ -820,7 +848,13 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <R> PushStream<R> window(Duration time, Executor executor,
Function<Collection<T>,R> f) {
- return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
+ return window(() -> time, () -> 0, executor, (t, c) -> {
+ try {
+ return f.apply(c);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
@Override
@@ -946,7 +980,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
.toMillis(elapsed)),
collected)));
} catch (Exception e) {
- close(PushEvent.error(e));
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
}
});
}
@@ -1126,7 +1164,11 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
Long.valueOf(NANOSECONDS.toMillis(elapsed)),
collected)));
} catch (Exception e) {
- close(PushEvent.error(e));
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
}
});
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
index 2aa6ec763..dfcf8b503 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
@@ -3,6 +3,8 @@ package org.osgi.util.pushstream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import org.osgi.annotation.versioning.ProviderType;
+
/**
* Create a buffered section of a Push-based stream
*
@@ -10,6 +12,7 @@ import java.util.concurrent.Executor;
* @param <T> The type of objects in the {@link PushEvent}
* @param <U> The type of the Queue used in the user specified buffer
*/
+@ProviderType
public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> {
/**
@@ -74,6 +77,6 @@ public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends
/**
* @return the object being built
*/
- R create();
+ R build();
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
index 3a4da2fd9..320adeebc 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -31,5 +31,13 @@ class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
// The base implementation has nothing to do, but
// this method is used in windowing
}
+
+ @Override
+ protected void upstreamClose(PushEvent< ? > close) {
+ if (closed.get() != CLOSED) {
+ close(close.nodata(), false);
+ }
+ previous.upstreamClose(close);
+ }
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
index 028f0a392..968f668f6 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
@@ -18,6 +18,8 @@ package org.osgi.util.pushstream;
import static org.osgi.util.pushstream.PushEvent.EventType.*;
+import org.osgi.annotation.versioning.ProviderType;
+
/**
* A PushEvent is an immutable object that is transferred through a
* communication channel to push information to a downstream consumer. The event
@@ -35,6 +37,7 @@ import static org.osgi.util.pushstream.PushEvent.EventType.*;
* @param <T> The payload type of the event.
* @Immutable
*/
+@ProviderType
public abstract class PushEvent<T> {
/**
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
index c26bc8c4d..1471a84f9 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
@@ -25,14 +25,14 @@ import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.function.Function;
+import org.osgi.util.function.Predicate;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.TimeoutException;
@@ -56,6 +56,16 @@ import org.osgi.util.promise.TimeoutException;
public interface PushStream<T> extends AutoCloseable {
/**
+ * Close this PushStream by sending an event of type
+ * {@link PushEvent.EventType#CLOSE} downstream. Closing a PushStream is a
+ * safe operation that will not throw an Exception.
+ * <p>
+ * Calling <code>close()</code> on a closed PushStream has no effect.
+ */
+ @Override
+ void close();
+
+ /**
* Must be run after the channel is closed. This handler will run after the
* downstream methods have processed the close event and before the upstream
* methods have closed.
@@ -222,12 +232,8 @@ public interface PushStream<T> extends AutoCloseable {
* {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
* the stream to close, preventing an event storm from reaching the client.
*
- * @param parallelism
- * @param executor
- * @param queue
- * @param queuePolicy
- * @param pushbackPolicy
- * @return Builder style (can be a new or the same object)
+ * @return A builder which can be used to configure the buffer for this
+ * pipeline stage.
*/
<U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
index d59c8d9d3..506c8f2e7 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
@@ -3,6 +3,8 @@ package org.osgi.util.pushstream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import org.osgi.annotation.versioning.ProviderType;
+
/**
* A Builder for a PushStream. This Builder extends the support of a standard
* BufferBuilder by allowing the PushStream to be unbuffered.
@@ -11,6 +13,7 @@ import java.util.concurrent.Executor;
* @param <T> The type of objects in the {@link PushEvent}
* @param <U> The type of the Queue used in the user specified buffer
*/
+@ProviderType
public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends BufferBuilder<PushStream<T>,T,U> {
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
index 5ec7cb336..0bf8d7127 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
@@ -77,7 +77,7 @@ class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
}
@Override
- public PushStream<T> create() {
+ public PushStream<T> build() {
if (unbuffered) {
return psp.createUnbufferedStream(eventSource, previousExecutor);
} else {
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
index be87c6bce..f63c66210 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
import java.util.stream.Stream;
/**
@@ -193,7 +192,7 @@ public final class PushStreamProvider {
((ExecutorService) toUse).shutdown();
}
releaseScheduler();
- }).map(Function.identity());
+ }).map(x -> x);
return stream;
}
@@ -226,7 +225,7 @@ public final class PushStreamProvider {
((ExecutorService) toUse).shutdown();
}
releaseScheduler();
- }).map(Function.identity());
+ }).map(x -> x);
return stream;
}
@@ -249,7 +248,7 @@ public final class PushStreamProvider {
*/
public <T> PushEventSource<T> createEventSourceFromStream(
PushStream<T> stream) {
- return buildEventSourceFromStream(stream).create();
+ return buildEventSourceFromStream(stream).build();
}
/**
@@ -269,7 +268,7 @@ public final class PushStreamProvider {
PushStream<T> stream) {
return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
@Override
- public PushEventSource<T> create() {
+ public PushEventSource<T> build() {
SimplePushEventSource<T> spes = createSimplePushEventSource(
concurrency, worker, buffer, bufferingPolicy, () -> {
try {
@@ -321,7 +320,7 @@ public final class PushStreamProvider {
Class<T> type) {
return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
@Override
- public SimplePushEventSource<T> create() {
+ public SimplePushEventSource<T> build() {
return createSimplePushEventSource(concurrency, worker, buffer,
bufferingPolicy, () -> { /* Nothing else to do */ });
}
@@ -403,7 +402,7 @@ public final class PushStreamProvider {
*/
public <T> PushEventConsumer<T> createBufferedConsumer(
PushEventConsumer<T> delegate) {
- return buildBufferedConsumer(delegate).create();
+ return buildBufferedConsumer(delegate).build();
}
/**
@@ -435,7 +434,7 @@ public final class PushStreamProvider {
PushEventConsumer<T> delegate) {
return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
@Override
- public PushEventConsumer<T> create() {
+ public PushEventConsumer<T> build() {
PushEventPipe<T> pipe = new PushEventPipe<>();
createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
@@ -574,7 +573,7 @@ public final class PushStreamProvider {
((ExecutorService) toUse).shutdown();
}
releaseScheduler();
- }).map(Function.identity());
+ }).map(x -> x);
return stream;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
index faf9e6584..d6116e34e 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -24,25 +24,30 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T
}
@Override
- protected boolean close(PushEvent<T> event) {
- if(super.close(event)) {
- ofNullable(upstream.getAndSet(() -> {
- // This block doesn't need to do anything, but the presence
- // of the Closable is needed to prevent duplicate begins
- })).ifPresent(c -> {
- try {
- c.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- });
+ protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) {
+ if (super.close(event, sendDownStreamEvent)) {
+ upstreamClose(event);
return true;
}
return false;
}
@Override
+ protected void upstreamClose(PushEvent< ? > close) {
+ ofNullable(upstream.getAndSet(() -> {
+ // This block doesn't need to do anything, but the presence
+ // of the Closable is needed to prevent duplicate begins
+ })).ifPresent(c -> {
+ try {
+ c.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ });
+ }
+
+ @Override
protected boolean begin() {
if(closed.compareAndSet(BUILDING, STARTED)) {
AutoCloseable toClose = connector.apply(this::handleEvent);
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
index e32bf225e..7c52b5afe 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
@@ -60,7 +60,7 @@ public class LogStreamProviderImpl implements LogStreamProvider {
LogEntrySource logEntrySource = new LogEntrySource(withHistory);
PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
//creating an unbuffered stream
- PushStream<LogEntry> logStream = streamBuilder.unbuffered().create();
+ PushStream<LogEntry> logStream = streamBuilder.unbuffered().build();
logEntrySource.setLogStream(logStream);
// Adding to sources makes the source start listening for new entries
logEntrySources.add(logEntrySource);

Back to the top