Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java157
1 files changed, 157 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java
new file mode 100644
index 000000000..3627c7c9b
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java
@@ -0,0 +1,157 @@
+package org.eclipse.equinox.internal.log.stream;
+
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogListener;
+import org.osgi.service.log.LogReaderService;
+import org.osgi.service.log.stream.LogStreamProvider;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamBuilder;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener {
+ private final PushStreamProvider pushStreamProvider = new PushStreamProvider();
+ private final Set<LogEntrySource> logEntrySources = new CopyOnWriteArraySet<>();
+ private final ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "LogStream thread");
+ }
+ });
+ private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
+ BundleContext context;
+ ReentrantLock eventProducerLock = new ReentrantLock();
+
+ /*
+ * (non-Javadoc)
+ * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
+ */
+ public void start(BundleContext context) throws Exception {
+ this.context = context;
+ logReaderService = new ServiceTracker<>(context, LogReaderService.class, this);
+ logReaderService.open();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
+ */
+ public void stop(BundleContext bundleContext) throws Exception {
+ logReaderService.close();
+ executor.shutdown();
+ }
+
+ @Override
+ public AtomicReference<LogReaderService> addingService(ServiceReference<LogReaderService> reference) {
+ AtomicReference<LogReaderService> tracked = new AtomicReference<>();
+ modifiedService(reference, tracked);
+ return tracked;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef, AtomicReference<LogReaderService> modifiedTracked) {
+ eventProducerLock.lock();
+ try {
+ // Check if the currently used reader service is lower ranked that the modified serviceRef
+ ServiceReference<LogReaderService> currentServiceRef = logReaderService.getServiceReference();
+ if (currentServiceRef == null || modifiedServiceRef.compareTo(currentServiceRef) > 0) {
+ // The modified service reference is higher ranked than the currently used one;
+ // Use the modified service reference instead.
+ LogReaderService readerService = context.getService(modifiedServiceRef);
+ if (readerService != null) {
+ if (modifiedTracked.get() == null) {
+ // update our tracked object for the reference with the real service
+ modifiedTracked.set(readerService);
+ }
+ // remove our listener from the currently used service
+ if (currentServiceRef != null) {
+ AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentServiceRef);
+ if (currentTracked != null) {
+ LogReaderService currentLogReader = currentTracked.get();
+ if (currentLogReader != null) {
+ // we were really using this service;
+ // remove the our listener and unget the service
+ currentLogReader.removeLogListener(this);
+ context.ungetService(currentServiceRef);
+ // finally null out our tracked reference
+ currentTracked.set(null);
+ }
+ }
+ }
+ readerService.addLogListener(this);
+ }
+ }
+ } finally {
+ eventProducerLock.unlock();
+ }
+ }
+
+ @Override
+ public void removedService(ServiceReference<LogReaderService> removedRef, AtomicReference<LogReaderService> removedTracked) {
+ eventProducerLock.lock();
+ try {
+ } finally {
+ LogReaderService removedLogReader = removedTracked.get();
+ if (removedLogReader != null) {
+ removedLogReader.removeLogListener(this);
+ context.ungetService(removedRef);
+ removedTracked.set(null);
+ }
+ ServiceReference<LogReaderService> currentRef = logReaderService.getServiceReference();
+ if (currentRef != null) {
+ AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentRef);
+ if (currentTracked != null) {
+ LogReaderService currentLogReader = currentTracked.get();
+ if (currentLogReader == null) {
+ currentLogReader = context.getService(currentRef);
+ currentTracked.set(currentLogReader);
+ }
+ if (currentLogReader != null) {
+ currentLogReader.addLogListener(this);
+ }
+ }
+ }
+ eventProducerLock.unlock();
+ }
+ }
+
+ @Override
+ public PushStream<LogEntry> createStream(Options... options) {
+ ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
+ if (options != null) {
+ for (Options option : options) {
+ if (Options.HISTORY.equals(option)) {
+ withHistory = logReaderService;
+ break;
+ }
+ }
+ }
+ LogEntrySource logEntrySource = new LogEntrySource(executor, withHistory);
+ PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
+ PushStream<LogEntry> logStream = streamBuilder.unbuffered().withExecutor(executor).create();
+ logEntrySources.add(logEntrySource);
+ return logStream;
+ }
+
+ @Override
+ public void logged(LogEntry entry) {
+ for (LogEntrySource logEntrySource : logEntrySources) {
+ logEntrySource.logged(entry);
+ }
+ }
+
+}

Back to the top