Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java133
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java11
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java6
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java14
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java20
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java51
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java32
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java4
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java43
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java6
10 files changed, 180 insertions, 140 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index ef8bb1420..cf16c2125 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@@ -64,6 +65,7 @@ import org.osgi.util.function.Function;
import org.osgi.util.function.Predicate;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.promise.TimeoutException;
import org.osgi.util.pushstream.PushEvent.EventType;
@@ -77,7 +79,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected final PushStreamProvider psp;
- protected final PushStreamExecutors executors;
+ protected final PromiseFactory promiseFactory;
protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
@@ -91,9 +93,9 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
protected abstract void upstreamClose(PushEvent< ? > close);
AbstractPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors) {
+ PromiseFactory promiseFactory) {
this.psp = psp;
- this.executors = executors;
+ this.promiseFactory = promiseFactory;
}
protected long handleEvent(PushEvent< ? extends T> event) {
@@ -216,7 +218,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> filter(Predicate< ? super T> predicate) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext((event) -> {
try {
if (!event.isTerminal()) {
@@ -239,7 +241,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
if (!event.isTerminal()) {
@@ -257,10 +259,62 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
}
@Override
+ public <R> PushStream<R> asyncMap(int n, int delay,
+ Function< ? super T,Promise< ? extends R>> mapper)
+ throws IllegalArgumentException, NullPointerException {
+
+ AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+ psp, promiseFactory, this);
+ Semaphore s = new Semaphore(n);
+ updateNext(event -> {
+ try {
+ if (event.isTerminal()) {
+ s.acquire(n);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+
+ s.acquire(1);
+
+ Promise< ? extends R> p = mapper.apply(event.getData());
+ p.thenAccept(d -> promiseFactory.executor().execute(() -> {
+ try {
+ if (eventStream
+ .handleEvent(PushEvent.data(d)) < 0) {
+ PushEvent<R> close = PushEvent.close();
+ eventStream.close(close);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(close);
+ }
+ } finally {
+ s.release();
+ }
+ })).onFailure(t -> promiseFactory.executor().execute(() -> {
+ PushEvent<T> error = PushEvent.error(t);
+ close(error);
+ // Upstream close is needed as we have no direct
+ // backpressure
+ upstreamClose(error);
+ }));
+
+ // The number active before was one less than the active number
+ int activePromises = Math.max(0, n - s.availablePermits() - 1);
+ return (activePromises + s.getQueueLength()) * delay;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+
+ return eventStream;
+ }
+
+ @Override
public <R> PushStream<R> flatMap(
Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
PushEventConsumer<R> consumer = e -> {
switch (e.getType()) {
@@ -320,7 +374,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> sorted(Comparator< ? super T> comparator) {
List<T> list = Collections.synchronizedList(new ArrayList<>());
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
switch(event.getType()) {
@@ -355,7 +409,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
throw new IllegalArgumentException("The limit must be greater than zero");
}
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
AtomicLong counter = new AtomicLong(maxSize);
updateNext(event -> {
try {
@@ -381,11 +435,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> limit(Duration maxTime) {
- Runnable start = () -> executors.schedule(() -> close(),
+ Runnable start = () -> promiseFactory.scheduledExecutor().schedule(
+ () -> close(),
maxTime.toNanos(), NANOSECONDS);
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, executors, this) {
+ psp, promiseFactory, this) {
@Override
protected void beginning() {
start.run();
@@ -409,11 +464,12 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long timeout = maxTime.toNanos();
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, executors, this) {
+ psp, promiseFactory, this) {
@Override
protected void beginning() {
lastTime.set(System.nanoTime());
- executors.schedule(() -> check(lastTime, timeout), timeout,
+ promiseFactory.scheduledExecutor().schedule(
+ () -> check(lastTime, timeout), timeout,
NANOSECONDS);
}
};
@@ -434,7 +490,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long elapsed = now - lastTime.get();
if (elapsed < timeout) {
- executors.schedule(() -> check(lastTime, timeout),
+ promiseFactory.scheduledExecutor().schedule(
+ () -> check(lastTime, timeout),
timeout - elapsed, NANOSECONDS);
} else {
PushEvent<T> error = PushEvent.error(new TimeoutException());
@@ -451,7 +508,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
"The number to skip must be greater than or equal to zero");
}
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
AtomicLong counter = new AtomicLong(n);
updateNext(event -> {
try {
@@ -475,7 +532,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> fork(int n, int delay, Executor ex) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+ psp, new PromiseFactory(Objects.requireNonNull(ex),
+ promiseFactory.scheduledExecutor()),
this);
Semaphore s = new Semaphore(n);
updateNext(event -> {
@@ -538,7 +596,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> merge(
PushEventSource< ? extends T> source) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
AtomicInteger count = new AtomicInteger(2);
PushEventConsumer<T> consumer = event -> {
try {
@@ -620,7 +678,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
- psp, executors) {
+ psp, promiseFactory) {
@Override
protected boolean begin() {
if (closed.compareAndSet(BUILDING, STARTED)) {
@@ -660,7 +718,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
for(int i = 0; i < tests.length; i++) {
- rsult[i] = new IntermediatePushStreamImpl<>(psp, executors, this);
+ rsult[i] = new IntermediatePushStreamImpl<>(psp, promiseFactory, this);
}
Boolean[] array = new Boolean[tests.length];
@@ -716,7 +774,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> sequential() {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
Lock lock = new ReentrantLock();
updateNext((event) -> {
try {
@@ -738,7 +796,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> coalesce(
Function< ? super T,Optional<R>> accumulator) {
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext((event) -> {
try {
if (!event.isTerminal()) {
@@ -785,7 +843,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, executors, this) {
+ psp, promiseFactory, this) {
@Override
protected void beginning() {
init.run();
@@ -857,7 +915,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <R> PushStream<R> window(Duration time,
Function<Collection<T>,R> f) {
- return window(time, executors.executor(), f);
+ return window(time, promiseFactory.executor(), f);
}
@Override
@@ -876,7 +934,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R> PushStream<R> window(Supplier<Duration> time,
IntSupplier maxEvents,
BiFunction<Long,Collection<T>,R> f) {
- return window(time, maxEvents, executors.executor(), f);
+ return window(time, maxEvents, promiseFactory.executor(), f);
}
@Override
@@ -902,7 +960,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long windowSize = time.get().toNanos();
previousWindowSize.set(windowSize);
- executors.schedule(
+ promiseFactory.scheduledExecutor().schedule(
getWindowTask(p, f, time, maxEvents, lock, count,
queueRef, timestamp, counter,
previousWindowSize, ex),
@@ -914,7 +972,8 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+ psp, new PromiseFactory(Objects.requireNonNull(ex),
+ promiseFactory.scheduledExecutor()),
this) {
@Override
protected void beginning() {
@@ -968,7 +1027,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long nextWindow = time.get().toNanos();
long backpressure = previousWindowSize.getAndSet(nextWindow)
- elapsed;
- executors.schedule(
+ promiseFactory.scheduledExecutor().schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
newCount, queueRef, timestamp, counter,
previousWindowSize, ex),
@@ -1194,7 +1253,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
long nextWindow = time.get().toNanos();
previousWindowSize.set(nextWindow);
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
- executors.schedule(
+ promiseFactory.scheduledExecutor().schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
expectedCounter + 1, queueRef, timestamp, counter,
previousWindowSize, executor),
@@ -1226,7 +1285,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
long bp = eventStream.handleEvent(event);
@@ -1247,7 +1306,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public PushStream<T> adjustBackPressure(
ToLongBiFunction<T,Long> adjustment) {
AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, executors, this);
+ psp, promiseFactory, this);
updateNext(event -> {
try {
long bp = eventStream.handleEvent(event);
@@ -1268,7 +1327,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Void> forEach(Consumer< ? super T> action) {
- Deferred<Void> d = executors.deferred();
+ Deferred<Void> d = promiseFactory.deferred();
updateNext((event) -> {
try {
switch(event.getType()) {
@@ -1307,7 +1366,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
- Deferred<T> d = executors.deferred();
+ Deferred<T> d = promiseFactory.deferred();
AtomicReference<T> iden = new AtomicReference<T>(identity);
updateNext(event -> {
@@ -1336,7 +1395,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
- Deferred<Optional<T>> d = executors.deferred();
+ Deferred<Optional<T>> d = promiseFactory.deferred();
AtomicReference<T> iden = new AtomicReference<T>(null);
updateNext(event -> {
@@ -1366,7 +1425,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
- Deferred<U> d = executors.deferred();
+ Deferred<U> d = promiseFactory.deferred();
AtomicReference<U> iden = new AtomicReference<>(identity);
updateNext(event -> {
@@ -1397,7 +1456,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
A result = collector.supplier().get();
BiConsumer<A, ? super T> accumulator = collector.accumulator();
- Deferred<R> d = executors.deferred();
+ Deferred<R> d = promiseFactory.deferred();
PushEventConsumer<T> consumer;
if (collector.characteristics()
@@ -1464,7 +1523,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Long> count() {
- Deferred<Long> d = executors.deferred();
+ Deferred<Long> d = promiseFactory.deferred();
LongAdder counter = new LongAdder();
updateNext((event) -> {
try {
@@ -1510,7 +1569,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Optional<T>> findFirst() {
- Deferred<Optional<T>> d = executors.deferred();
+ Deferred<Optional<T>> d = promiseFactory.deferred();
updateNext((event) -> {
try {
Optional<T> o = null;
@@ -1544,7 +1603,7 @@ abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
@Override
public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
- Deferred<Long> d = executors.deferred();
+ Deferred<Long> d = promiseFactory.deferred();
LongAdder la = new LongAdder();
updateNext((event) -> {
try {
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
index 7a8d1636e..dfb9eeeaa 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import org.osgi.util.promise.PromiseFactory;
+
class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
@@ -46,11 +48,11 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
private final int parallelism;
BufferedPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors, U eventQueue, int parallelism,
+ PromiseFactory promiseFactory, U eventQueue, int parallelism,
QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executors, connector);
+ super(psp, promiseFactory, connector);
this.eventQueue = eventQueue;
this.parallelism = parallelism;
this.semaphore = new Semaphore(parallelism);
@@ -85,7 +87,7 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
}
private void startWorker() {
- executors.execute(() -> {
+ promiseFactory.executor().execute(() -> {
try {
PushEvent< ? extends T> event;
while ((event = eventQueue.poll()) != null) {
@@ -99,7 +101,8 @@ class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>
close();
return;
} else if(backpressure > 0) {
- executors.schedule(this::startWorker, backpressure,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, backpressure,
MILLISECONDS);
return;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
index 678751215..c939c4a6e 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -18,15 +18,17 @@ package org.osgi.util.pushstream;
import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+import org.osgi.util.promise.PromiseFactory;
+
class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
implements PushStream<T> {
private final AbstractPushStreamImpl< ? > previous;
IntermediatePushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors,
+ PromiseFactory promiseFactory,
AbstractPushStreamImpl< ? > previous) {
- super(psp, executors);
+ super(psp, promiseFactory);
this.previous = previous;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
index 57a3bb99a..574d65590 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
@@ -90,7 +90,7 @@ public abstract class PushEvent<T> {
* @throws IllegalStateException if this event is not an
* {@link EventType#ERROR} event.
*/
- public Exception getFailure() throws IllegalStateException {
+ public Throwable getFailure() throws IllegalStateException {
throw new IllegalStateException(
"Not an ERROR event, the event type is " + getType());
}
@@ -119,11 +119,11 @@ public abstract class PushEvent<T> {
* Create a new error event.
*
* @param <T> The payload type.
- * @param e The error.
+ * @param t The error.
* @return A new error event with the specified error.
*/
- public static <T> PushEvent<T> error(Exception e) {
- return new ErrorEvent<T>(e);
+ public static <T> PushEvent<T> error(Throwable t) {
+ return new ErrorEvent<T>(t);
}
/**
@@ -182,14 +182,14 @@ public abstract class PushEvent<T> {
}
static final class ErrorEvent<T> extends PushEvent<T> {
- private final Exception error;
+ private final Throwable error;
- ErrorEvent(Exception error) {
+ ErrorEvent(Throwable error) {
this.error = error;
}
@Override
- public Exception getFailure() {
+ public Throwable getFailure() {
return error;
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
index 6d5953e34..071c9ec7d 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
@@ -104,6 +104,26 @@ public interface PushStream<T> extends AutoCloseable {
<R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
/**
+ * Asynchronously map the payload values. The mapping function returns a
+ * Promise representing the asynchronous mapping operation.
+ * <p>
+ * The PushStream limits the number of concurrently running mapping
+ * operations, and returns back pressure based on the number of existing
+ * queued operations.
+ *
+ * @param n number of simultaneous promises to use
+ * @param delay Nr of ms/promise that is queued back pressure
+ * @param mapper The mapping function
+ * @return Builder style (can be a new or the same object)
+ * @throws IllegalArgumentException if the number of threads is &lt; 1 or
+ * the delay is &lt; 0
+ * @throws NullPointerException if the mapper is null
+ */
+ <R> PushStream<R> asyncMap(int n, int delay,
+ Function< ? super T,Promise< ? extends R>> mapper)
+ throws IllegalArgumentException, NullPointerException;
+
+ /**
* Flat map the payload value (turn one event into 0..n events of
* potentially another type).
*
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
deleted file mode 100644
index 7c2509eef..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.osgi.util.promise.PromiseExecutors;
-
-class PushStreamExecutors extends PromiseExecutors {
- PushStreamExecutors(Executor executor, ScheduledExecutorService scheduler) {
- super(requireNonNull(executor), requireNonNull(scheduler));
- }
-
- void execute(Runnable operation) {
- executor().execute(operation);
- }
-
- ScheduledFuture< ? > schedule(Runnable operation, long delay,
- TimeUnit unit) {
- return scheduledExecutor().schedule(operation, delay, unit);
- }
-
- @Override
- protected Executor executor() {
- return super.executor();
- }
-
- @Override
- protected ScheduledExecutorService scheduledExecutor() {
- return super.scheduledExecutor();
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
index c7d861bf3..ecd8bf4c4 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -23,6 +23,7 @@ import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
import java.util.Iterator;
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +37,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
+import org.osgi.util.promise.PromiseFactory;
+
/**
* A factory for {@link PushStream} instances, and utility methods for handling
* {@link PushEventSource}s and {@link PushEventConsumer}s
@@ -164,7 +167,7 @@ public final class PushStreamProvider {
workerToUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
- workerToUse = executor;
+ workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -174,7 +177,7 @@ public final class PushStreamProvider {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
- timerToUse = scheduler;
+ timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
@@ -191,7 +194,7 @@ public final class PushStreamProvider {
}
PushStream<T> stream = new BufferedPushStreamImpl<>(this,
- new PushStreamExecutors(workerToUse, timerToUse), queue,
+ new PromiseFactory(workerToUse, timerToUse), queue,
parallelism, queuePolicy,
pushbackPolicy, aec -> {
try {
@@ -231,7 +234,7 @@ public final class PushStreamProvider {
workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
- workerToUse = executor;
+ workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -241,11 +244,11 @@ public final class PushStreamProvider {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
- timerToUse = scheduler;
+ timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
PushStream<T> stream = new UnbufferedPushStreamImpl<>(this,
- new PushStreamExecutors(workerToUse, timerToUse),
+ new PromiseFactory(workerToUse, timerToUse),
aec -> {
try {
return eventSource.open(aec);
@@ -533,7 +536,7 @@ public final class PushStreamProvider {
toUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
- toUse = executor;
+ toUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -546,7 +549,7 @@ public final class PushStreamProvider {
}
SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
- new PushStreamExecutors(toUse, acquireScheduler()), queuePolicy,
+ new PromiseFactory(toUse, acquireScheduler()), queuePolicy,
queue, parallelism,
() -> {
try {
@@ -712,7 +715,7 @@ public final class PushStreamProvider {
workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
- workerToUse = executor;
+ workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
@@ -722,12 +725,12 @@ public final class PushStreamProvider {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
- timerToUse = scheduler;
+ timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
- this, new PushStreamExecutors(workerToUse, timerToUse), aec -> {
+ this, new PromiseFactory(workerToUse, timerToUse), aec -> {
return () -> { /* No action to take */ };
}) {
@@ -736,7 +739,7 @@ public final class PushStreamProvider {
if (super.begin()) {
Iterator<T> it = items.iterator();
- executors.execute(() -> pushData(it));
+ promiseFactory.executor().execute(() -> pushData(it));
return true;
}
@@ -753,8 +756,9 @@ public final class PushStreamProvider {
close();
return;
} else {
- executors.schedule(
- () -> executors
+ promiseFactory.scheduledExecutor()
+ .schedule(
+ () -> promiseFactory.executor()
.execute(() -> pushData(it)),
returnValue, MILLISECONDS);
return;
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
index f47bcb1e7..314ae0830 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
@@ -76,9 +76,9 @@ public interface SimplePushEventSource<T>
* {@link #open(PushEventConsumer)} this source, and will receive subsequent
* events.
*
- * @param e the error
+ * @param t the error
*/
- void error(Exception e);
+ void error(Throwable t);
/**
* Determine whether there are any {@link PushEventConsumer}s for this
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
index 094a580ab..478d0e4a3 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.Promises;
+import org.osgi.util.promise.PromiseFactory;
class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
implements SimplePushEventSource<T> {
private final Object lock = new Object();
- private final PushStreamExecutors executors;
- private final PushStreamExecutors sameThread;
+ private final PromiseFactory promiseFactory;
+ private final PromiseFactory sameThread;
private final QueuePolicy<T,U> queuePolicy;
@@ -57,13 +57,13 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private boolean waitForFinishes;
- public SimplePushEventSourceImpl(PushStreamExecutors executors,
+ public SimplePushEventSourceImpl(PromiseFactory promiseFactory,
QueuePolicy<T,U> queuePolicy,
U queue, int parallelism, Runnable onClose) {
- this.executors = executors;
- this.sameThread = new PushStreamExecutors(
- PushStreamExecutors.inlineExecutor(),
- executors.scheduledExecutor());
+ this.promiseFactory = promiseFactory;
+ this.sameThread = new PromiseFactory(
+ PromiseFactory.inlineExecutor(),
+ promiseFactory.scheduledExecutor());
this.queuePolicy = queuePolicy;
this.queue = queue;
this.parallelism = parallelism;
@@ -111,7 +111,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
try {
- executors.execute(() -> safePush(pec, event));
+ promiseFactory.executor().execute(() -> safePush(pec, event));
} catch (RejectedExecutionException ree) {
// TODO log?
if (!event.isTerminal()) {
@@ -126,7 +126,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
PushEventConsumer< ? super T> pec, PushEvent<T> event) {
Deferred<Long> d = sameThread.deferred();
try {
- executors.execute(
+ promiseFactory.executor().execute(
() -> d.resolve(Long.valueOf(
System.nanoTime() + safePush(pec, event))));
} catch (RejectedExecutionException ree) {
@@ -206,8 +206,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
}
@Override
- public void error(Exception e) {
- enqueueEvent(PushEvent.error(e));
+ public void error(Throwable t) {
+ enqueueEvent(PushEvent.error(t));
}
private void enqueueEvent(PushEvent<T> event) {
@@ -237,7 +237,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
"unchecked", "boxing"
})
private void startWorker() {
- executors.execute(() -> {
+ promiseFactory.executor().execute(() -> {
try {
for(;;) {
@@ -287,7 +287,8 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
- System.nanoTime();
if (toWait > 0) {
- executors.schedule(this::startWorker, toWait,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, toWait,
NANOSECONDS);
return;
}
@@ -297,14 +298,15 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
long toWait = p.getValue() - System.nanoTime();
if (toWait > 0) {
- executors.schedule(this::startWorker, toWait,
+ promiseFactory.scheduledExecutor().schedule(
+ this::startWorker, toWait,
NANOSECONDS);
} else {
startWorker();
}
return p;
}, p -> close(
- PushEvent.error((Exception) p.getFailure())));
+ PushEvent.error(p.getFailure())));
return;
}
}
@@ -346,8 +348,7 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
return doCall(event, pec);
}
}).collect(toList());
- return Promises
- .all(sameThread.deferred(), calls)
+ return sameThread.all(calls)
.map(l -> l.stream().max(Long::compareTo).orElseGet(
() -> Long.valueOf(System.nanoTime())));
}
@@ -375,17 +376,17 @@ class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends
if (connected.isEmpty()) {
if (connectPromise == null) {
- connectPromise = executors.deferred();
+ connectPromise = promiseFactory.deferred();
}
return connectPromise.getPromise();
} else {
- return executors.resolved(null);
+ return promiseFactory.resolved(null);
}
}
}
private Promise<Void> closedConnectPromise() {
- return executors.failed(new IllegalStateException(
+ return promiseFactory.failed(new IllegalStateException(
"This SimplePushEventSource is closed"));
}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
index 53400c4c2..eb3e93350 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -23,6 +23,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.osgi.util.promise.PromiseFactory;
+
class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
extends AbstractPushStreamImpl<T> implements PushStream<T> {
@@ -31,9 +33,9 @@ class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T
protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>();
UnbufferedPushStreamImpl(PushStreamProvider psp,
- PushStreamExecutors executors,
+ PromiseFactory promiseFactory,
Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executors);
+ super(psp, promiseFactory);
this.connector = connector;
}

Back to the top

'`zhK #g#n8I[g[򻄩ɖªȔ|EPz}1=Y5}OGɜ=Ƙ $xb'Dhmy 6 ~~RLX";vYTQ3!\o!N!jT:kڇVu%kk>lj-]3<{!?~~lk{HHXV㸍ȗiޟpqjBAF hu;EX"EQ)JoE}WyՠQ.j 䘉!3ڦO#:dP/]nz(A%0'-Os.9 8i&Jp @'ch1dmB[Έul@C0*bI72L;%[9D<4Pg?ZLbWbSɲ` /&t@FլPw ;. "z!Jyo[ՍJJW ta5þ6>aSEBՖ^=Ɍ f8䯜+A@^Ua/ }=%Է0G<.e ~T,ϝɫl H>6vcə$#jYw7R%y[ nVZI=vص}Ñt^xOBk~l+z8C!bP@BM 6ut k\kP%XvVB;-iǽ5fN+=!zk.D6fOCwfB&a0fr˥pnoQJ$R?Jܹ_PY"hʹPH^zF' =6K+%X)m=gsJP 믯x% NqD#+Pb'!x=HsQTߊ8!ظQ=Nm J'WTQUի`x`ՃKe%VB|1vtC&.6감Ÿ҂tMGzڻޔ.ҫD 5n遑wp]o$a穓<„ĀCͷa t HL%RW VrrT^<8FNqPzՇ&t@rƻ%Ka@)a0qt>dG Y'^lpP>iXs`l;KC_76-ZjWY?K|Px׉c#;.&Z?K2.lN`( 0DIdU[ɼH B<枞N(j:^u5TEW&/glq.([^{2F{qɧg:C0HM]TG s4T{BTZim8Q-q niƳ-+*&I Ë%.ln1c"~G`*n7挵sg֬e INZ3ZDێK7KhفJt8_'Y!U(vQ`m%H9Hֶ4U= #-P3zũ Ck7O/OϤ"n<*%|H3^%#܉F4k\OZ\>*w(`++MT*k6`h9R^ 7H@˲jqfB3:h;@LwL C}18k 4]қXeUjTM>A-ĝ=6+Q(N8f&[6Dʽ'd>NZ $hh!X< 3d@_Fr(9GVCF7$u.b"#ѠZ.e@ LEșQ ??*s~M]-{N֠pOGOk8 <)EgU$] 3:o:])ЍWj^^g9EjB0(hi=C(AyjǬD(r[''{`āq&X5NW0FKz2|+>ϔK n+,%pB'&9:d`ڛ -1lJ}"n EL2O& pŏa,?/\܇%JVa!lw9ϡ8y w7'$.Ix).3f=^x,N,E\;"Gg vC7#^vXO9h?ukMN.߃ I͸0WUߖ Hy~C@m4X2:ATFO D2\a@W>m^DuCMų//Q & %Ac%3T'E r:Pxꐵi=(pr# ,R g]hlkhtcE*VBs_l7[ j+G S/4I("lz.@S~ 6 R_6Ϗ @aess\̨5O,Η3=>8 SH?ͭmʉP=0L#vT\`ǣ&^shQ85rfF"qO8l<_+[]?bMa3=Q`FSigDf XF󝙍Efv.z=nq]_$HMTJ2+kliv;g95_l,Y)Zqeóޯe^摿|0{1aRSibȎd7h%j LhXw-~mP+m_oQp7#n@pt1 xӠg*/o ̳~C,]?pp,sE]M}4 vQ*S fs}5Spo~aɱp.P@HHVvW 37oMD](/&aR^J{8,rWEoF+PF˼<mj1Ax/QwKkC^Ns+ r_AvZf} l,af +3c>q$d^4=@Wtf6nyasH(',71:7/>蒎V7h_WZ /wG HѴӯ'?XJ\uSCm0s 5>S̡ړMbe^Op[ z`lU (SBAn כWlʗ=}Y H?[[i^זK%F~;0>pRGkB=HMMkeb9OT\dRH;;\Ozs]0Iay!dmMF`+'_J$鶹.1cH %d 23=7g&t+EG[jzga,n ]h Yûg ʘR>2ebWHP+{oZsu.Vr'[{*@j)=sZZ>Hÿ`Eq<6vp(ɘ7t=_LԺټHmϼ(y3^֤RB7ƕƢ,Ow;Òxd#zNvn?w@;Opr'+wAsդ|Dg<5ؗ!@۔q jA0џ+#j?3+6^ ؘyeD3)'c[Uscm[EV."Ro*Z^<šB9cyfD%o h0Oy\0/Hr3/.~`Lo sN"|6V$\K7"lΰi\nr~qV leo"^ ?APt3 N/06[C#< eA?CtM""V5\߅G(}_TVHgaډn a0]6Hg=EBybz[XѲo>y>TP  0 K`{885^=(D-%ow5^ | y෭@g+AY#4,P3^'P:ǩCF4ҡ7eGåo\g3\X W@K¿5H=ƄɌű(x wWލ@欢ZހGjZ Gm"Q#+<Mꀰr?bGLMdy8ӄ "wu"S]m:ohbd5EDjL1PMKY]QE1Ss.z|YN`zCKcՓ ԲV r&$׿ `VA`ռVwwOm HOQh(@|ėK3Cu/}I3WOu1eN #Ӷ)Z 1X"j>_FēmiY1Gb~,8sDߕrbpU$P ;`N<!Z> 9]!rFP 6(*4#nMj ޷UPG¨l{[@O隊=2\`]+6ۯpQxƉ:sy5<_`. yáWńXkƵoT2n%ܨz)dFCi$C'ty$>ԂKY13Q F TMLj$T%(> l(Z8!rg>7Q8gʓq \iK!\ӖX9o%dxm Ny^}mY¡9p9k_St J=FLj␂FXKט#f+WƱA emHxyO:qv~Żyrj» UJ5ENC~+_*T~so_xy_l28k2:Džgy$ i0ܵ̚ 0G\\tynTU=՛<Rv{ܥ TC+ռ 'CN ;՝2_1`i8z^%ᐇIw4699ln1PBC(P~%Ϡ O>YY~v|Γ^d_ǘB5h\6 v ]/'lԞa`YEþ[,ҁ V߿ _h4Pa\΄; @ ,~)F[ ?1J7"D I&A\ (y0~dU,5.v~˗x}pLw6ɱPMQJ^ )zVUZ)EJl3y'ߢyPdZU/\q@o_ZJ$U'T>!,οi /SX[xOIA,6_OR=^}-R5^\}c8ŰWXԂӈpACPX/(ڮxo24 QFyeF3/ro{Ƃ?T##4i.;3劅bFͶT铎 g*4 Cs4fUR"!."&5b"daTq ["o=Ӌ2I1|ݣcB6a^ٳSz.UI #3-- | %'3{{3ٶ6~pǽ}m8 3Z35E@K_Sspk//-tbMDDЛv5д86h](:X㸔0l4ey '4zs>$[o>pUQ#j_Klw.0.랹XoA>TĖ?a}`ڱg[.O$G?~;N&&5%)6"(#匶řkV UZ=B>C%'A>Z9.Jj`xL`2-vI>{*wMپҪwm#>g\Yp.(-<6tȢ[L4  BC='z_ESD P{U{1yi|7L2ܓg^A0z; &ЋôE Db?j4f[= I}^`;u(f]w ̋.?XfMXҳH1tעNک0_#kb 0b] >M $JPƃ ~xj`Ifd4.*.MB{#c"=NȺ\9_:D"Ŀ!/hS?iEP%@.e&\Z|c IkM4.Wܓl)\e6z~-!,]Y,Nl+r1|'s͕X7ʻmAϛf\#f ϟ ^S "RFBR;!#hd P#N`'VbY1e⯲sDC N @ebO0CQ=&gU@㡭 #N+- 3N[ׅGqUB׍rwuqWx{DYSh]!Ki(^!ۑ(CV 7Ol/PΐB6p}`zJH)q&c=H%nՕLѮCQ>l:~xl(A$*y=uUU䞙/.^:r`+, p!Q lZ|fսWV3@)Y*5=fTEMT̷QޅX x*b\v/ʲb:NÑE*ٔ))+昼1ToCe&>TxE1AZXo"i z(~'!+ar};q"B沈IZ\p.* |Kb"&Z TR{؜kr'b \jE.aES_|y w2Eƛ#>dx61CgD =8'r%8NĽ܌\yښQP1d(q A$ ?J_?-pŒO+Gd<!ƩkWJU]"Kģ(M|Fw:}#Y/SCv٣) qF1M``=8Ǩ-gҠ= Nd |M |QmTRucIsG%уm,7@,Rq91xX]3QjP@Y?O4$YEx26LaO0DLV^Z; >Z7ojB3d #s)m_cD鑋-γEsC+> Ւxr9"Jgrڒ˓'s)H_KOԚ_3Y|?v]E.y\.CԚ?[YWD"11WÍ񲛄 "sdj _VlWB!y?CY `_= zD %$Kc#6G;) N<]ҰM.a1? Y,Wm(t'EC/+703UWw0̙=G;r\mYTdb%=ϛ@,6zĊj51C`2 nm5-C>*[#D#9RSlF(| ;lA}{?郸j#9P"epɶnPwzIe쎘U9M|yxR qrۣrMW@|j^3vS,# 0*WJ-Ӣ[:ض[~A(Wxf3{Q5",Q&ѯ?'/来Xe _JDuPgY/Q%H_7+r;̶r;WrZin9ne_-PA8-.v0 ̓ǰʎ&jCGC\Vp Z?odMzI96PᄫKlpptj ^吟O9yD(Kl>쉊#;Ȋf1[*wMM)qIk_i} @ n@Ο8O7ҼJEG$R} aʬ t">djN04ȃE1}hi7)efn!IzG^_"M^GDgNY'pu/$a D5AF@[;X׆ÿ1I4/C?s%{hS>6UaJ3u,733sQVY a |C2E}{DU1*I󖫕>&FwOiios'!hC\FyN ac3gӥr#$7*D0@'+,fS Z)D|$,v&.l M`4}C-p" W)˯jddҐmdB)h+n]׈o Ɣ!,P6e2ˋz۠ϯ.fpW'&)M27plz^ũ4Bє&E:@/|OZ1PTQw5jXSxH}>m0- p#RǙDqiUv-O;@u_oc-ߠ=[ ןݨ}7 \dbm_".QdoW6`d ^<$yx^,N0=>>e$.N?;ҜZC Zre^JJx8`~v8Ƅ :2|n KBc7':}]q M~(S:K;ܬ ;"ľp|) [x 3 &K`HxjtR/ťD_l4.nbcѐ}@TBEM>4/}rB 6iZkՍ^.Hjs^PN<_z2|iwQ:-0%v/ ѨmFpIkV?JkɺTx3&5-2O9ȝ$ ϕ5F`;h,i3b_oP,ܩ3MTAnnXż;*vFbKa96Iة4_.ʃwqepb)x( ^aBvKӇ$!kA|J$iʈ_y3`&{?mAK+F]-= PG-,p}y6]~,j`sTsSx HD΍gVBWǫ҂0,` ("| z1d+eja?Rو14")|@GKw: %XJi6ۍY.gs`p-~ @ .E7UjiH 5N!#[}ޜq)o~O6[ͽlYvp2ߌ }KJٌ!Ukr @ꯎ(g4u%[4s: JQE+*?eBϒQSj`c -<Ѕ+*@GV/ *`*3x¥b13,HtI#8,`E #W8Mߣ%}s~B&;[l΋`yõ)` ͔2V>xx$)A>)zKk%U]-Bb7.}荢9eIh۴waĔƌ$4 /[:zkMĆBnÅN!J3݊=gr-J.,TM Hh[{(GZ Gu6`=`HA[ujk$le;VEQV5k{zV"c$H7s-oc?*O_QGS)9+/2vN;m ?b\SZ.͉rBϙc7ˆGBX#ѩ5bC[o dkE??񐘸_3"ˑK$@!.m6Q:-euh<ǼաoeX* ȡ>:q zZ߲  hP8J M,jCyNn ͩ'}HJkRJ#Z V#O4XjFAB4 r/XzCY )4]íR !Wn؃.~/rTܱn&%Z/s 4/=*Y5\zTU&wh> N !b迚d+Ӓ&Rn}řѶ*iս GlƺّS  eY/D<*_yGa߃Y Mf8R]Ն.b-gޡEgtʫFw4)HYᏜy?uqYAK4[U5#SwP_ԝD~ te.ճz Z6\k/OXR3p0񺁊`RyWMd&EW s~B6F[K l*a ,8v_3FHV<78?(1Ihc_ɠӤVQݶݺEE*457e Ӗlr6ϙMrּIkN*uºF`o5eRnS,XC--2J*oa٦(C3lSKm8j "ޛ_:Xc2u6Bh(/;LQf|Z(D6aB'@̪P8#k^_I'ZK~ȵf^9fFqf(BRH'9uX{DQRn0'{ @$u'vT65뽠"d"1WeEܓ]UW퇘 M T[CôJD _ice^x''ԫ9|'A;jhVƑqM)0ޫYgٰY5}-@Y22!d;^nh sH 9Q}N# Vd" 7d~[_5Ol^ @8{q,Ӭlkǝ(H " !@9%ڈ'_)7I ItAo’rm:7ej9^}D#g%!Bh\VvCOtU6^}jqs['H' gKJCoFp LϏ@eXڈh9Q.X1zQvAs FLs`LTXjq#{ vOip4q8I 0Z"J BP`f[;ަpQ/j8ZeғgE+֥loўCڒ*vdzmSw \bw?-3=UiJL^]Ѓ*H QnIZ S[fj%4fjy{z-zނҥUL9CJP|e/k E(9Ip%MXyr6 &t;N&R͉tzʦŎjoӫ~2*rI|:Ns=r,s˲Jn q9[X)F/wk '. UFMnxfuB rևAj4j@h?GWU0,z f ˅ "KLOWDHvρRH@*d+ wdOV|iKV%98.c~9fT6K281p?7.!,/t"wC"Qd)Bfj:|OxER*xCshXL" C!iA NPhlLX+vH>2'5ۣnu(<~,U?8F:)/ctjGYR~GQ1;seB|"PyWR[[`?ӗ;^bNKJLh+qZ,5Pah3?ZZZ W炤B7 Hw{'mMKOnYƷLڨxLoNx<g$W,P#+i^"O^=tbG=R0-,sӱZꋍgP2@{ؠu-JR"~ $ \ԤUDhP t}¢]%vٽ9NHZv}d/ Ӆ3zX񧇤p'k ǁy?: V+$JI6xmKN."j|]wd@a1 m'z-X>KUȃ׿Qh{%e:a.$plu3VXjmc#SxӘ e|*VX7܄7@A\ЕSPDaS$aR4MWię$IתBx/Yf;3jкH8ˣ kBA {0i-a n$QmYΆVN6l Eq25\:q16 PIxB̙n߯?@"GDZq&#X*]DKJ߇$ska뢅夙omĶKuU~T-c:&=yu^<$y71#/fӮƟp# 8q7 ˘N?*\=D_Nފ<&IV#>Io1FH1lX>˚9/"gi1XF*PǹcI(<(ݖ(ᮤCoynĥE>It(6Nej2]=w_E^Tv!;X!zĐf:.*4CBWkeJcBdsD2aF $T*Q^w2B_̔r ceP(κ=UOى/_#CS5z1xK\1} ВLPkQ3u-F ^{ϲ$H0_Y j[c5@i?_)jq6ߵ?J9raY/cۢ)G>zF*Y30-Q"eNSw_6W@ 3mWwl;\Ze~c,D*ɇ $]]S3T_>b| &ls݄3uYmi6;҃1I&]Y.4 FWӶ`u]#%@|(RL_ŽԖŜq& 䯋&>v{P.|5>tAʷlýwTʟ3}X< ~ .n̮AX`f*-,lOZ Yk2քpee<"4!j׵*R_AѴ~O:m`t0zxN=god/cWE%">]v+#w—gdYb*O0&6WNkeBO6CcޅQ./G |F[z,pC0m>wxԞn" y?w7uYg?"@Aۮ fl _]]!w#,}_Z 4H 8qA jW|śSOS5wރuՀ`8"Ib& ˽^&5 $/Cb>(9)ҜqV#ZR1rŚEK2KDm q*Rα#{5KDtuf{) anTQ8?K] )bT3lڮUƒEv3KiLC϶}j|xwEsūom0={5.D"!1ZSwUD*d~u$+ ^"6 kSE{RPFbϰ~zΓO5I/՗\Y8_>qwgLH$%b]a,Ɓ9)Q#d麹^]X(8H7E]>ZUaz a&aSp&32ônHڝPl֠25'l[;E7!3[@ж f&s_5p(-^mg9GcL\_܋WL`tƷZ9zSg9piP*0| O$xR=?s)C$. =8 # 'ʹX6]a"S]Ziu${2H$ޒb=1v,ZW(/w1VՏ8 Y>1.Ȟ̞C;|*J`r3 /:$maNzڿ ~Wl_jo@.32_$rJy ٷ<R# N\P!fʄëe# s d&eIȮnZ^%BZ;Ct KYBu.Z ҙRS ~M|I 9}(?9&ةowߠ췝V~Րc!N鍮Gv)E`dbsTVӖr%kq0z/,r遼Jq* ,x"Z]3BKZJ{Vr+ -oYs ?Rz nN5hF͸ Gk|VW_/u*Kq39݃4K8vOˌy>΃Dq1iF|Às Q78;LC0?z JԣMϭJ:Qn,]yHI)j5T Zp V?|?e WĂFo~:Δ4׵Jjם'}K\kwRL8DFwt~O<Z'L_zHɟtM^@g iK0Ef'z`iY"JR19Λ=sm(}`Ζ /dY{LVRXyG[Tդ;5Žxؔ1[(*FQuz?JiO#^S z$F?wv rt?U/^T7:N_:3qh9r~<ջV!W$~nݪ]R2;GS7qIѮKY ]W_?7SzK.]XsQ#u/B1=waJ}W'9:N69_qHsP7@%3)+;}h2Q^ᇪf6ޝ-6G8o~E8tX<>nrSH--jTgdbMfoSz0 U˼n6ivA[9;ޫ.F}jlmDGِVt _ӥU5,)s !/x63:XH< & mBh;ȁM?ԲjtY}m12nj4`<\ 6-PznTBw$@oEp :]wE0Z, R$y9 F׳CR 駰ѳB*bdtdr ~zh}>Qŵ}\Ag=YHzfve 3Ae:mIۛҴ>;WTd${*—oͺC70v3ܪ hF^yWVJ*WҒkGZ $ Zs%6#R=ymʦp]]ə5\RjӂwnLh\wCLY,YqYJtqr2)8/-Ka RĀ> "%U4B5r6+dl(SEyo ` ,̲=oݨˏX| ] yӎny†Ԯ[|U]m C* )Մd >'8&M\:kiB ۸TtPܼg޴FNSxz}B;MN2_ÜB䧢S` h|$J:|2Xa֫/J_09o=zꂱWeil,'%gWK"vs:C^ƨ= 9«IU%8wiWƆX=-?BToA_Qڟ>l /-D2RjT2ʥ 4.m-J݌eVp,ݾ@@m^:N u\x/²J" e@¾]EJԫG JWBG2Zxѳ=ŒD+X*(y} 1ꅃIlKs2,l3<| E/YiECw`μmPoDp$gn cO pZ2s̸)m%"@FWw$f1%@8+1!aė. LIeI{TKtmn9q!dj#Q(jJ]d*˓θIS )4rANtu /~Oa{ʎ*!K>ԟJ6ڶ $ ݛFX&VT= eM;ET6dE{auvSBuԑ++oդ4سe*iRdJgAl+ll La'g/<\g~+ PYC2X3onl2 xۚ2o'_rR) 33I㹐v[YZj%`{e<Ǚ n=25=CÅGҭ1OoXy";ؘJ\~M]YC>6Ð!V5Zt=9!UExo c@E|.bI'rkeu?8M#zq rgp2h#(,Y3ol@!L /WAKVN5 V~ >HL@'ZKv|5,9Cf3)(_z؃M?MAat\w]|gG%{y >H,$\y1wP0)9Q{xa[,_9%u r9&~n]S-WC*d$rsz'xF6VS:ȘAIz)\9>7 ԍOnM"; # bb s?YDPOߎ~}~CڜXNl! ܤ pRs~G AE (r ^&t@D'ndÓ&|INx`!+5g7ڬDۓ%OTO};OZ[\\鉻orOˮ˴gH'&NBٲ'+q3-Lb#@IOz޼!gV*i"o:7rZmD˜8w1@尨a=56a8>vfrowz:13z.BZ8uoRs6^qb4QWp!y^z:sb)Ֆ&$ov ZsUuR?}Rq/(7L`f9A载0N{'zH ?WBҢvB6xߖbA [.lCg tNЦ8>B5`nm&VwSe>>N1@^V:GfߏNg 5>ɜ=P.Ey46P~u@q|YQ;2el8z_p~U|qFcj4?v!FVTԧ_7mr&a=#@QTyf@|%+do^ΙvIX(qEqUV!7Fp\s%v4?$%DKMUL3 ןkթa1 EȻEDޣۄᆤ(\g,ކ_A+Pw$c,ĝH<̘IBLFW'qq(Q,\<ڼ輼s\423D{rKI#YL)aٓ,uԗOp)+۲ -C|l M^?{mT_ݹ S&səCebvӫu9GsfyFEc47++>΂\} .mK}<zA6^ pDN3^q Y .<nLY&AMf\y!'a9 (>J .dVwB2lFx9pqmd]3RMK@/ JzH'VH E>`U5נ"{ߔ;tgB/_ɇ@ǹP<,)|Z_9 >B"B+_큇yv2|8`ØCS飻PG3eaxq h䉺Ld[n.uf,t48'_.,f٩u7Zl&A/溠v L`m8ݙ6ȘJ"DIC)՟-zcTk8\"yE:=gIֆy; sRg i{ǣlrsRhU[5ϻ 9 `7A^`H_-X +)h;2]pn޴y;,` g,-sCגWɈHy='d wg^ՋݺoP#.b19_‹5^:(td'g;0 -=UˆA74;egG'O&D&oyeV˥ݹIUk+49XN+ }GV=Eñ QÑuKGO:la>~c؟Jk5G 3) ꕸ̒Ys׷ܺQ F1Ɵd AwB 2d-٣ӂnm U64}5׷bXn ;_@醐3c4]rA A`M){@ s-ץ4J-3i>E7Zd<4ȳҹlYK5 Ҳ3nXԸHUR;T1KINg)-* Ô.~>~7uڊܵI)qa%8Ɋ@^ZDE\ 8A.O8)ͤRV[Y,[<3$a@p~p\z^D@Ƃ">n c&Đ-'ݠ[ QNh N$ mܟ)hD8TbK !h&G $f<#pa!KG/V*z𳮂l*Ir onǽqq]-V׿^י?)à <>a3K&fZU4L ;cR,F(`ژMM'2~㢣FAI{lt`tb}?Ξ{8YdArxy&0a: n 輹Z!csc8qLNxwvio>I?:ݥɬ!z U]Jq4W~8iZ)Os 1@g+dO[ԇ)?=`DPrf)W^$O?Ma nt,㗮a8HkR+en=y:`CGqE{W;zk}a#VyM >( x$ӄ*9qq8LT 9j KAן[jLbuL$ݑ:z9&B0DՑ¹?o(<Hok<LD͚:YTSۭK%Po&VH$9G2}3]dvsuݦo1h@%gmz:2.0e65!^ 5cNxo86% ]Z$XSK:ĩŘqJC[q?g[[ճt7;+lM5p\>mqaӷE=p&QL! x -V dm/ O:-.`3"ق)dD}/pW])ѝ锾V=@s3v"66'duWܐ[a6^r^ʉϞƒJ 3\ S,/2˕Wu^MrA{Oo$CAN<|^T߅<\Yzknm pd~!R 7K&QdtuKf9.#Š&b{.|풤Pjz85TydJ6tA|f]@b&+lȧS`EĜ|0= PnT/nrv>pjs3EB_Cܪq| = 6VT':E{JAi[DM+l{Qb?FkT3?)Yc ‰|,Γ }of PW<OfsN7c0}qw[ya@9wekзRG`%8j$BY;D3UG Dj|[' FܻfA4{|Az 8i'Bx9-sFӎVx &8FH1)DʐI Z&3d#0o J812 O:g=AAϛeZ6E>@ô%,s-0'[=BGrp]q(ߵB62]< (% pKM#n@bn 2AݻLu\x4 ' U W}v'-u*KT)]w/@~ ^,Kͳ $J=C t{{ c[lfRs!TOo$aJ: Kg^M}' t*Vuǩavc#E2$K =ɾH~dL~D4&.jKNƧPLsMPsh9+f@{"bh nmj3x4ֳn5[UY .VEvC@o"Z)^fM|dhWO'zp}c䨔.dg++ݡ+!K8Po%|@֥z'M#]Oς4Ƃ9aqM{z;aV=><Ѹ~j< \jy}$U/ N\R ,:EBx.2ƏP*pE CW D)0MS Շ!#c=…䬋)Չϟ8 l[/8WFɬup3S+27d;y¿<*΍MfR&ŅšnRu"ʙ-ހd]o,Sa;?ۖtcHc澾.#;*i/SoXXpYǜ>zlʻ V䫣iw<5+>6㼩yOȤ M:a"dWF[iP8 vbV<*h%OvrHp{ ?_k00_^3>!0;Ŝ \3[/?]jLgwǝ `}Q߷& e LC h2Mq cϠvUX< hUUCWGamZ$>81xtX( + UۜI.Vyn# (Ahk`;Z?93=ߪuc2[Ab%G{\?󘸄 j-KZoLxܾTֻeS';؎0s 9_g~( EWXgWQsIrZNWPSʳ99d9U$lZ0֮uKJELj0&xdlT1Kޮv f;oǞ-ʶW6USmг^Nq5]܏N4ӫ (uhܤj6PWmj2EhʤJXȟ<XC BgYjijR4ƂqT×ڨI=a*b6͔s$sHK#EJn#⚞;@~QF5(Ѽ5\Fsis/Z?aYafֻa(T37j)f9Q :l6iIQ;<;W׷S|:@C(عR(Fw0Nj1%魷5 Ŝ085~c]Wz40|;^ÚU}6]I/iE20T$l.j2smCnf,~-¿ \IjIشb Y+ /!%6zѷ[Bf[a VMxއ!g@K» eMMΰ6=u kIvمI1]և#i /h6ʹ*/W"! 0J!Uʲ$EȋoV׷8il wv