Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnjum Fatima2017-09-08 17:49:46 +0000
committerAnjum Fatima2017-09-08 17:57:08 +0000
commit3caa76c880b6dc05b49a06d3ff10f675391eb517 (patch)
tree8a150fa26357bbff2bc9f12d96ed81537339bad0
parentbb900c876e84cdebed182b36da33eb23f29435f8 (diff)
downloadrt.equinox.bundles-I20170908-2000.tar.gz
rt.equinox.bundles-I20170908-2000.tar.xz
rt.equinox.bundles-I20170908-2000.zip
rather than unbuffered Change-Id: I4ede21315e1d47ee5429ee70eed623bb6adda2cb Signed-off-by: Anjum Fatima <anjum.eclipse@gmail.com>
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java9
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java23
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java19
3 files changed, 38 insertions, 13 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
index 8cf9a5cfa..8c8a9712d 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
@@ -33,8 +33,8 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz
private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration;
private LogStreamProviderFactory logStreamProviderFactory;
private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
- BundleContext context;
- ReentrantLock eventProducerLock = new ReentrantLock();
+ private BundleContext context;
+ private final ReentrantLock eventProducerLock = new ReentrantLock();
/*
* (non-Javadoc)
@@ -42,14 +42,11 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz
*/
@Override
public void start(BundleContext bc) throws Exception {
-
this.context = bc;
logReaderService = new ServiceTracker<>(context, LogReaderService.class, this);
logReaderService.open();
-
logStreamProviderFactory = new LogStreamProviderFactory(logReaderService);
logStreamServiceRegistration = context.registerService(LogStreamProvider.class, logStreamProviderFactory, null);
-
}
/*
@@ -61,6 +58,7 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz
logReaderService.close();
logStreamServiceRegistration.unregister();
logStreamServiceRegistration = null;
+ logStreamProviderFactory.shutdownExecutor();
}
/*
@@ -159,7 +157,6 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz
@Override
public void logged(LogEntry entry) {
-
logStreamProviderFactory.postLogEntry(entry);
}
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
index f47d0fa19..058207158 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
@@ -12,6 +12,9 @@ package org.eclipse.equinox.internal.log.stream;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.osgi.framework.Bundle;
@@ -28,6 +31,16 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide
ReentrantReadWriteLock eventProducerLock = new ReentrantReadWriteLock();
ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
+ /*
+ * ExecutorService is used to provide parallelism of one by making sure only one thread is used for the executor
+ */
+ private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "LogStream thread");
+ }
+ });
+
public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
this.logReaderService = logReaderService;
}
@@ -58,7 +71,7 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide
@Override
public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration) {
- LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService);
+ LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService, executor);
eventProducerLock.writeLock().lock();
try {
providers.put(bundle, logStreamProviderImpl);
@@ -85,9 +98,15 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide
} finally {
eventProducerLock.writeLock().unlock();
}
-
logStreamProviderImpl.close();
}
+ /*
+ * Shutdown the executor
+ */
+ public void shutdownExecutor() {
+ executor.shutdown();
+ }
+
}
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
index 7c52b5afe..94d93721d 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
@@ -14,6 +14,8 @@ import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.osgi.service.log.LogEntry;
@@ -23,6 +25,7 @@ import org.osgi.util.pushstream.PushEvent;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamBuilder;
import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.QueuePolicyOption;
import org.osgi.util.tracker.ServiceTracker;
public class LogStreamProviderImpl implements LogStreamProvider {
@@ -32,17 +35,22 @@ public class LogStreamProviderImpl implements LogStreamProvider {
private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap);
private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
+ private final ExecutorService executor;
- public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
+ public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService, ExecutorService executor) {
this.logReaderService = logReaderService;
+ this.executor = executor;
}
/* Create a PushStream of {@link LogEntry} objects.
- * The returned PushStream is an unbuffered stream with a parallelism of one.
+ * The returned PushStream is
+ * Buffered with a buffer large enough to contain the history, if included.
+ * Have the QueuePolicyOption.DISCARD_OLDEST queue policy option.
+ * Use a shared executor.
+ * Have a parallelism of one.
* (non-Javadoc)
* @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
*/
-
@Override
public PushStream<LogEntry> createStream(Options... options) {
ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
@@ -59,8 +67,9 @@ public class LogStreamProviderImpl implements LogStreamProvider {
try {
LogEntrySource logEntrySource = new LogEntrySource(withHistory);
PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
- //creating an unbuffered stream
- PushStream<LogEntry> logStream = streamBuilder.unbuffered().build();
+ //creating a buffered push stream
+ LinkedBlockingQueue<PushEvent<? extends LogEntry>> historyQueue = new LinkedBlockingQueue<>();
+ PushStream<LogEntry> logStream = streamBuilder.withBuffer(historyQueue).withExecutor(executor).withQueuePolicy(QueuePolicyOption.DISCARD_OLDEST).build();
logEntrySource.setLogStream(logStream);
// Adding to sources makes the source start listening for new entries
logEntrySources.add(logEntrySource);

Back to the top