diff options
author | Anjum Fatima | 2017-09-08 17:49:46 +0000 |
---|---|---|
committer | Anjum Fatima | 2017-09-08 17:57:08 +0000 |
commit | 3caa76c880b6dc05b49a06d3ff10f675391eb517 (patch) | |
tree | 8a150fa26357bbff2bc9f12d96ed81537339bad0 | |
parent | bb900c876e84cdebed182b36da33eb23f29435f8 (diff) | |
download | rt.equinox.bundles-I20170909-0800.tar.gz rt.equinox.bundles-I20170909-0800.tar.xz rt.equinox.bundles-I20170909-0800.zip |
Bug 521208 - PushStream returned by LogStreamProvider should be bufferedI20170913-1120I20170913-0800I20170913-0420I20170913-0220I20170912-2255I20170912-2000I20170912-0800I20170911-2000I20170911-0800I20170911-0405I20170910-2000I20170910-1055I20170910-0800I20170909-1500I20170909-0920I20170909-0800I20170909-0510I20170908-2345I20170908-2000
rather than unbuffered
Change-Id: I4ede21315e1d47ee5429ee70eed623bb6adda2cb
Signed-off-by: Anjum Fatima <anjum.eclipse@gmail.com>
3 files changed, 38 insertions, 13 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java index 8cf9a5cfa..8c8a9712d 100644 --- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java +++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java @@ -33,8 +33,8 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration; private LogStreamProviderFactory logStreamProviderFactory; private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService; - BundleContext context; - ReentrantLock eventProducerLock = new ReentrantLock(); + private BundleContext context; + private final ReentrantLock eventProducerLock = new ReentrantLock(); /* * (non-Javadoc) @@ -42,14 +42,11 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz */ @Override public void start(BundleContext bc) throws Exception { - this.context = bc; logReaderService = new ServiceTracker<>(context, LogReaderService.class, this); logReaderService.open(); - logStreamProviderFactory = new LogStreamProviderFactory(logReaderService); logStreamServiceRegistration = context.registerService(LogStreamProvider.class, logStreamProviderFactory, null); - } /* @@ -61,6 +58,7 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz logReaderService.close(); logStreamServiceRegistration.unregister(); logStreamServiceRegistration = null; + logStreamProviderFactory.shutdownExecutor(); } /* @@ -159,7 +157,6 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz @Override public void logged(LogEntry entry) { - logStreamProviderFactory.postLogEntry(entry); } diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java index f47d0fa19..058207158 100644 --- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java +++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java @@ -12,6 +12,9 @@ package org.eclipse.equinox.internal.log.stream; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.osgi.framework.Bundle; @@ -28,6 +31,16 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide ReentrantReadWriteLock eventProducerLock = new ReentrantReadWriteLock(); ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService; + /* + * ExecutorService is used to provide parallelism of one by making sure only one thread is used for the executor + */ + private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "LogStream thread"); + } + }); + public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) { this.logReaderService = logReaderService; } @@ -58,7 +71,7 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide @Override public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration) { - LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService); + LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService, executor); eventProducerLock.writeLock().lock(); try { providers.put(bundle, logStreamProviderImpl); @@ -85,9 +98,15 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide } finally { eventProducerLock.writeLock().unlock(); } - logStreamProviderImpl.close(); } + /* + * Shutdown the executor + */ + public void shutdownExecutor() { + executor.shutdown(); + } + } diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java index 7c52b5afe..94d93721d 100644 --- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java +++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java @@ -14,6 +14,8 @@ import java.util.Collections; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.osgi.service.log.LogEntry; @@ -23,6 +25,7 @@ import org.osgi.util.pushstream.PushEvent; import org.osgi.util.pushstream.PushStream; import org.osgi.util.pushstream.PushStreamBuilder; import org.osgi.util.pushstream.PushStreamProvider; +import org.osgi.util.pushstream.QueuePolicyOption; import org.osgi.util.tracker.ServiceTracker; public class LogStreamProviderImpl implements LogStreamProvider { @@ -32,17 +35,22 @@ public class LogStreamProviderImpl implements LogStreamProvider { private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap); private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock(); + private final ExecutorService executor; - public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) { + public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService, ExecutorService executor) { this.logReaderService = logReaderService; + this.executor = executor; } /* Create a PushStream of {@link LogEntry} objects. - * The returned PushStream is an unbuffered stream with a parallelism of one. + * The returned PushStream is + * Buffered with a buffer large enough to contain the history, if included. + * Have the QueuePolicyOption.DISCARD_OLDEST queue policy option. + * Use a shared executor. + * Have a parallelism of one. * (non-Javadoc) * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[]) */ - @Override public PushStream<LogEntry> createStream(Options... options) { ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null; @@ -59,8 +67,9 @@ public class LogStreamProviderImpl implements LogStreamProvider { try { LogEntrySource logEntrySource = new LogEntrySource(withHistory); PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource); - //creating an unbuffered stream - PushStream<LogEntry> logStream = streamBuilder.unbuffered().build(); + //creating a buffered push stream + LinkedBlockingQueue<PushEvent<? extends LogEntry>> historyQueue = new LinkedBlockingQueue<>(); + PushStream<LogEntry> logStream = streamBuilder.withBuffer(historyQueue).withExecutor(executor).withQueuePolicy(QueuePolicyOption.DISCARD_OLDEST).build(); logEntrySource.setLogStream(logStream); // Adding to sources makes the source start listening for new entries logEntrySources.add(logEntrySource); |