diff options
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF | 9 | ||||
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java | 122 | ||||
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java (renamed from bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java) | 100 | ||||
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java | 103 | ||||
-rw-r--r-- | bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java | 117 |
5 files changed, 388 insertions, 63 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF index 74f3a73cc..6360391d8 100644 --- a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF +++ b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF @@ -3,12 +3,13 @@ Bundle-ManifestVersion: 2 Bundle-Name: Logstream Bundle-SymbolicName: org.eclipse.equinox.log.stream Bundle-Version: 1.0.0.qualifier -Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamFactoryImpl +Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamManager Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Import-Package: org.osgi.framework;version="[1.9.0,2.0.0)", org.osgi.service.log;version="[1.4.0,2.0.0)", + org.osgi.service.log.stream;version="[1.0,1.1)", org.osgi.util.promise;version="[1.0.0,2.0.0)", + org.osgi.util.pushstream;version="[1.0,1.1)", org.osgi.util.tracker;version="[1.5.0,2.0.0)" -Bundle-ActivationPolicy: lazy -Export-Package: org.osgi.service.log.stream;version="1.0.0", - org.osgi.util.pushstream;version="1.0.0" +Export-Package: org.osgi.service.log.stream;version="1.0.0";uses:="org.osgi.util.pushstream", + org.osgi.util.pushstream;version="1.0.0";uses:="org.osgi.util.promise" diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java index 0842e6c00..ef2ff332d 100644 --- a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java +++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java @@ -1,34 +1,101 @@ +/******************************************************************************* + * Copyright (c) 2017 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM Corporation - initial API and implementation + *******************************************************************************/ package org.eclipse.equinox.internal.log.stream; import java.io.Closeable; +import java.util.Enumeration; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.osgi.service.log.LogEntry; import org.osgi.service.log.LogReaderService; import org.osgi.util.pushstream.PushEvent; import org.osgi.util.pushstream.PushEventConsumer; import org.osgi.util.pushstream.PushEventSource; +import org.osgi.util.pushstream.PushStream; import org.osgi.util.tracker.ServiceTracker; public class LogEntrySource implements PushEventSource<LogEntry> { - private final Executor executor; - private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory; private final Set<PushEventConsumer<? super LogEntry>> consumers = new CopyOnWriteArraySet<>(); - public LogEntrySource(Executor executor, ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) { - this.executor = executor; + private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory; + private volatile PushStream<LogEntry> logStream; + private final ReentrantLock historyLock = new ReentrantLock(); + + + + public LogEntrySource(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) { this.withHistory = withHistory; + + } + + public PushStream<LogEntry> getLogStream() { + return logStream; + } + + public void setLogStream(PushStream<LogEntry> logStream) { + this.logStream = logStream; } - + + /* Open method isused to connect to the source and begin receiving a stream of events. + * It returns an AutoCloseable which can be used to close the event stream. + * If the close method is called on this object then the stream is terminated by sending a close event. + * (non-Javadoc) + * @see org.osgi.util.pushstream.PushEventSource#open(org.osgi.util.pushstream.PushEventConsumer) + */ + @Override public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception { + + LinkedBlockingDeque<LogEntry> historyList = new LinkedBlockingDeque<LogEntry>(); + if (!consumers.add(aec)){ throw new IllegalStateException("Cannot add the same consumer multiple times"); } - return () -> { + + + /*when history is not equal to null then we acquire a lock to provide the full history + * to the consumer first before any other new entries + */ + if(withHistory!=null){ + historyLock.lock(); + try{ + AtomicReference<LogReaderService> readerRef = withHistory.getService(); + LogReaderService reader = readerRef.get(); + if (reader != null){ + // Enumeration has the most recent entry first + Enumeration<LogEntry> e= reader.getLog(); + if(e!=null){ + while(e.hasMoreElements()){ + historyList.add(e.nextElement()); + } + } + //Logging the history in the order of their appearance + if(historyList!=null){ + while(!historyList.isEmpty()){ + LogEntry logEntry = historyList.removeLast(); + logged(logEntry); + } + } + } + } + finally{ + historyLock.unlock(); + } + } + + Closeable result = () -> { if (consumers.remove(aec)) { try { aec.accept(PushEvent.close()); @@ -37,19 +104,42 @@ public class LogEntrySource implements PushEventSource<LogEntry> { } } }; + + return result; } - + + + public void logged(LogEntry entry) { - for (PushEventConsumer<? super LogEntry> consumer : consumers) { - try { - long status = consumer.accept(PushEvent.data(entry)); - if (status < 0) { - consumer.accept(PushEvent.close()); - } - } catch (Exception e) { + if (withHistory != null) { + historyLock.lock(); + } + + /*consumer accepts the incoming log entries and returns a back pressure. + * A return of zero indicates that event delivery may continue immediately. + * A positive return value indicates that the source should delay sending any further events for the requested number of milliseconds. + * A return value of -1 indicates that no further events should be sent and that the stream can be closed. + * @see org.osgi.util.pushstream.PushEventConsumer<T> + */ + try{ + for (PushEventConsumer<? super LogEntry> consumer : consumers) { + try { + long status = consumer.accept(PushEvent.data(entry)); + + if (status < 0) { + consumer.accept(PushEvent.close()); + } + + } catch (Exception e) { // we ignore exceptions here for log stream + } } } + finally{ + if(withHistory != null){ + historyLock.unlock(); + } + + } } - } diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java index 3627c7c9b..6953a6cbc 100644 --- a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java +++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java @@ -1,37 +1,39 @@ +/******************************************************************************* + * Copyright (c) 2017 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM Corporation - initial API and implementation + *******************************************************************************/ package org.eclipse.equinox.internal.log.stream; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.log.LogEntry; import org.osgi.service.log.LogListener; import org.osgi.service.log.LogReaderService; import org.osgi.service.log.stream.LogStreamProvider; -import org.osgi.util.pushstream.PushEvent; -import org.osgi.util.pushstream.PushStream; -import org.osgi.util.pushstream.PushStreamBuilder; -import org.osgi.util.pushstream.PushStreamProvider; import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTrackerCustomizer; -public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener { - private final PushStreamProvider pushStreamProvider = new PushStreamProvider(); - private final Set<LogEntrySource> logEntrySources = new CopyOnWriteArraySet<>(); - private final ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "LogStream thread"); - } - }); + +/* LogStreamManager is used to start and stop the bundle and keeps the track of the logs using the + * ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>> which listens to + * the incoming logs using the LogListener. It is also responsible to provide service tracker + * and each log entry to the LogStreamProviderFactory. + * + */ +public class LogStreamManager implements BundleActivator, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener { + private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration; + private LogStreamProviderFactory logStreamProviderFactory; private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService; BundleContext context; ReentrantLock eventProducerLock = new ReentrantLock(); @@ -41,9 +43,14 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) */ public void start(BundleContext context) throws Exception { + this.context = context; logReaderService = new ServiceTracker<>(context, LogReaderService.class, this); logReaderService.open(); + + logStreamProviderFactory = new LogStreamProviderFactory(logReaderService); + logStreamServiceRegistration = context.registerService(LogStreamProvider.class, logStreamProviderFactory, null); + } /* @@ -52,16 +59,27 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, */ public void stop(BundleContext bundleContext) throws Exception { logReaderService.close(); - executor.shutdown(); + logStreamServiceRegistration.unregister(); + logStreamServiceRegistration = null; } + /* + * (non-Javadoc) + * @see org.osgi.util.tracker.ServiceTrackerCustomizer#addingService(org.osgi.framework.ServiceReference) + */ + @Override public AtomicReference<LogReaderService> addingService(ServiceReference<LogReaderService> reference) { AtomicReference<LogReaderService> tracked = new AtomicReference<>(); modifiedService(reference, tracked); return tracked; } - + + + /* + * (non-Javadoc) + * @see org.osgi.util.tracker.ServiceTrackerCustomizer#modifiedService(org.osgi.framework.ServiceReference, java.lang.Object) + */ @Override public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef, AtomicReference<LogReaderService> modifiedTracked) { eventProducerLock.lock(); @@ -84,7 +102,7 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, LogReaderService currentLogReader = currentTracked.get(); if (currentLogReader != null) { // we were really using this service; - // remove the our listener and unget the service + // remove our listener and unget the service currentLogReader.removeLogListener(this); context.ungetService(currentServiceRef); // finally null out our tracked reference @@ -92,6 +110,7 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, } } } + readerService.addLogListener(this); } } @@ -99,14 +118,21 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, eventProducerLock.unlock(); } } - + + + /* + * (non-Javadoc) + * @see org.osgi.util.tracker.ServiceTrackerCustomizer#removedService(org.osgi.framework.ServiceReference, java.lang.Object) + */ @Override public void removedService(ServiceReference<LogReaderService> removedRef, AtomicReference<LogReaderService> removedTracked) { + eventProducerLock.lock(); try { } finally { LogReaderService removedLogReader = removedTracked.get(); if (removedLogReader != null) { + // remove the listener removedLogReader.removeLogListener(this); context.ungetService(removedRef); removedTracked.set(null); @@ -129,29 +155,17 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, } } - @Override - public PushStream<LogEntry> createStream(Options... options) { - ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null; - if (options != null) { - for (Options option : options) { - if (Options.HISTORY.equals(option)) { - withHistory = logReaderService; - break; - } - } - } - LogEntrySource logEntrySource = new LogEntrySource(executor, withHistory); - PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource); - PushStream<LogEntry> logStream = streamBuilder.unbuffered().withExecutor(executor).create(); - logEntrySources.add(logEntrySource); - return logStream; - } + + + /* It is used to post each log entry to the LogStreamProviderFactory + * (non-Javadoc) + * @see org.osgi.service.log.LogListener#logged(org.osgi.service.log.LogEntry) + */ @Override public void logged(LogEntry entry) { - for (LogEntrySource logEntrySource : logEntrySources) { - logEntrySource.logged(entry); - } + + logStreamProviderFactory.postLogEntry(entry); } } diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java new file mode 100644 index 000000000..00ba1e882 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java @@ -0,0 +1,103 @@ +/******************************************************************************* + * Copyright (c) 2017 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM Corporation - initial API and implementation + *******************************************************************************/ +package org.eclipse.equinox.internal.log.stream; +import org.osgi.service.log.LogEntry; +import org.osgi.service.log.LogReaderService; +import org.osgi.service.log.stream.LogStreamProvider; +import org.osgi.util.tracker.ServiceTracker; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.osgi.framework.Bundle; +import org.osgi.framework.ServiceFactory; +import org.osgi.framework.ServiceRegistration; + +public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvider> { + + Map<Bundle, LogStreamProviderImpl> providers = new HashMap<Bundle,LogStreamProviderImpl>() ; + ReentrantReadWriteLock eventProducerLock = new ReentrantReadWriteLock(); + ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService; + + public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService){ + this.logReaderService = logReaderService; + } + + + /*Reader of providers map + * 1) for each provider + * - post entry to provider + */ + public void postLogEntry(LogEntry entry) { + eventProducerLock.readLock().lock(); + try{ + for(LogStreamProviderImpl provider : providers.values()){ + provider.logged(entry); + } + } + finally{ + eventProducerLock.readLock().unlock(); + } + + } + + /* Writer to providers map + * 1) create new LogStreamProviderImpl + * 2) put new instance in map + * 3) return new instance + * (non-Javadoc) + * @see org.osgi.framework.ServiceFactory#getService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration) + */ + + @Override + public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration) { + LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService); + eventProducerLock.writeLock().lock(); + try{ + providers.put(bundle, logStreamProviderImpl); + return logStreamProviderImpl; + } + finally{ + eventProducerLock.writeLock().unlock(); + } + } + + /* 1) Remove the logStreamProviderImpl instance associated with the bundle + * 2) close all existing LogStreams from the provider, outside the write lock + * (non-Javadoc) + * @see org.osgi.framework.ServiceFactory#ungetService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration, java.lang.Object) + */ + + @Override + public void ungetService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration, + LogStreamProvider service) { + + LogStreamProviderImpl logStreamProviderImpl; + + eventProducerLock.writeLock().lock(); + try{ + logStreamProviderImpl = providers.remove(bundle); + } + finally{ + eventProducerLock.writeLock().unlock(); + } + + logStreamProviderImpl.close(); + + } + + + + + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java new file mode 100644 index 000000000..0ceb63bdd --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java @@ -0,0 +1,117 @@ +/******************************************************************************* + * Copyright (c) 2017 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM Corporation - initial API and implementation + *******************************************************************************/ +package org.eclipse.equinox.internal.log.stream; + +import java.util.Collections; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.osgi.service.log.LogEntry; +import org.osgi.service.log.LogReaderService; +import org.osgi.service.log.stream.LogStreamProvider; +import org.osgi.util.pushstream.PushEvent; +import org.osgi.util.pushstream.PushStream; +import org.osgi.util.pushstream.PushStreamBuilder; +import org.osgi.util.pushstream.PushStreamProvider; +import org.osgi.util.tracker.ServiceTracker; + +public class LogStreamProviderImpl implements LogStreamProvider { + private final PushStreamProvider pushStreamProvider = new PushStreamProvider(); + private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService; + private final WeakHashMap<LogEntrySource,Boolean> weakMap = new WeakHashMap<>(); + private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap); + + private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock(); + + + + public LogStreamProviderImpl( + ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) { + this.logReaderService = logReaderService; + } + + + /* Create a PushStream of {@link LogEntry} objects. + * The returned PushStream is an unbuffered stream with a parallelism of one. + * (non-Javadoc) + * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[]) + */ + + @Override + public PushStream<LogEntry> createStream(Options... options) { + ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null; + if (options != null) { + for (Options option : options) { + if (Options.HISTORY.equals(option)) { + withHistory = logReaderService; + } + } + } + + + // A write lock is acquired in order to add logEntrySource into the Set of logEntrySources. + historyLock.writeLock().lock(); + try{ + LogEntrySource logEntrySource = new LogEntrySource(withHistory); + PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource); + //creating an unbuffered stream + PushStream<LogEntry> logStream = streamBuilder.unbuffered().create(); + logEntrySource.setLogStream(logStream); + // Adding to sources makes the source start listening for new entries + logEntrySources.add(logEntrySource); + return logStream; + } + finally{ + historyLock.writeLock().unlock(); + } + } + + /* + * Send the incoming log entries to the logEntrySource.logged(entry) for the consumer to accept it. + */ + public void logged(LogEntry entry) { + historyLock.readLock().lock(); + try{ + for (LogEntrySource logEntrySource : logEntrySources) { + logEntrySource.logged(entry); + } + } + finally{ + historyLock.readLock().unlock(); + } + } + + + /* + * Closing the stream for each source. + */ + public void close() { + PushStream<LogEntry> logStream; + historyLock.readLock().lock(); + try{ + for(LogEntrySource logEntrySource : logEntrySources) { + logStream = logEntrySource.getLogStream(); + try { + logStream.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }finally{ + historyLock.readLock().unlock(); + } + } + +} |