Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnjum Fatima2017-05-12 17:51:57 +0000
committerThomas Watson2017-06-16 12:38:08 +0000
commitf8c001d4d52fba3998b6ff01f36496f6c81af3c7 (patch)
treebcf45245721f5e7a1057331717e879c411e414d3
parentb0b632a4b575f3d996560ae5915370025d18234d (diff)
downloadrt.equinox.framework-f8c001d4d52fba3998b6ff01f36496f6c81af3c7.tar.gz
rt.equinox.framework-f8c001d4d52fba3998b6ff01f36496f6c81af3c7.tar.xz
rt.equinox.framework-f8c001d4d52fba3998b6ff01f36496f6c81af3c7.zip
Bug 516761 - [osgi R7] Implement new LogStreamProvider service
Change-Id: I2b6af3240a26b766777011ead6e04fe6d94a791c Signed-off-by: Anjum Fatima <anjum.eclipse@gmail.com>
-rw-r--r--bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF9
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java122
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java (renamed from bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java)100
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java103
-rw-r--r--bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java117
5 files changed, 388 insertions, 63 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
index 74f3a73cc..6360391d8 100644
--- a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
+++ b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
@@ -3,12 +3,13 @@ Bundle-ManifestVersion: 2
Bundle-Name: Logstream
Bundle-SymbolicName: org.eclipse.equinox.log.stream
Bundle-Version: 1.0.0.qualifier
-Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamFactoryImpl
+Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamManager
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Import-Package: org.osgi.framework;version="[1.9.0,2.0.0)",
org.osgi.service.log;version="[1.4.0,2.0.0)",
+ org.osgi.service.log.stream;version="[1.0,1.1)",
org.osgi.util.promise;version="[1.0.0,2.0.0)",
+ org.osgi.util.pushstream;version="[1.0,1.1)",
org.osgi.util.tracker;version="[1.5.0,2.0.0)"
-Bundle-ActivationPolicy: lazy
-Export-Package: org.osgi.service.log.stream;version="1.0.0",
- org.osgi.util.pushstream;version="1.0.0"
+Export-Package: org.osgi.service.log.stream;version="1.0.0";uses:="org.osgi.util.pushstream",
+ org.osgi.util.pushstream;version="1.0.0";uses:="org.osgi.util.promise"
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java
index 0842e6c00..ef2ff332d 100644
--- a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java
+++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java
@@ -1,34 +1,101 @@
+/*******************************************************************************
+ * Copyright (c) 2017 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * IBM Corporation - initial API and implementation
+ *******************************************************************************/
package org.eclipse.equinox.internal.log.stream;
import java.io.Closeable;
+import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogReaderService;
import org.osgi.util.pushstream.PushEvent;
import org.osgi.util.pushstream.PushEventConsumer;
import org.osgi.util.pushstream.PushEventSource;
+import org.osgi.util.pushstream.PushStream;
import org.osgi.util.tracker.ServiceTracker;
public class LogEntrySource implements PushEventSource<LogEntry> {
- private final Executor executor;
- private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory;
private final Set<PushEventConsumer<? super LogEntry>> consumers = new CopyOnWriteArraySet<>();
- public LogEntrySource(Executor executor, ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) {
- this.executor = executor;
+ private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory;
+ private volatile PushStream<LogEntry> logStream;
+ private final ReentrantLock historyLock = new ReentrantLock();
+
+
+
+ public LogEntrySource(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) {
this.withHistory = withHistory;
+
+ }
+
+ public PushStream<LogEntry> getLogStream() {
+ return logStream;
+ }
+
+ public void setLogStream(PushStream<LogEntry> logStream) {
+ this.logStream = logStream;
}
-
+
+ /* Open method isused to connect to the source and begin receiving a stream of events.
+ * It returns an AutoCloseable which can be used to close the event stream.
+ * If the close method is called on this object then the stream is terminated by sending a close event.
+ * (non-Javadoc)
+ * @see org.osgi.util.pushstream.PushEventSource#open(org.osgi.util.pushstream.PushEventConsumer)
+ */
+
@Override
public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception {
+
+ LinkedBlockingDeque<LogEntry> historyList = new LinkedBlockingDeque<LogEntry>();
+
if (!consumers.add(aec)){
throw new IllegalStateException("Cannot add the same consumer multiple times");
}
- return () -> {
+
+
+ /*when history is not equal to null then we acquire a lock to provide the full history
+ * to the consumer first before any other new entries
+ */
+ if(withHistory!=null){
+ historyLock.lock();
+ try{
+ AtomicReference<LogReaderService> readerRef = withHistory.getService();
+ LogReaderService reader = readerRef.get();
+ if (reader != null){
+ // Enumeration has the most recent entry first
+ Enumeration<LogEntry> e= reader.getLog();
+ if(e!=null){
+ while(e.hasMoreElements()){
+ historyList.add(e.nextElement());
+ }
+ }
+ //Logging the history in the order of their appearance
+ if(historyList!=null){
+ while(!historyList.isEmpty()){
+ LogEntry logEntry = historyList.removeLast();
+ logged(logEntry);
+ }
+ }
+ }
+ }
+ finally{
+ historyLock.unlock();
+ }
+ }
+
+ Closeable result = () -> {
if (consumers.remove(aec)) {
try {
aec.accept(PushEvent.close());
@@ -37,19 +104,42 @@ public class LogEntrySource implements PushEventSource<LogEntry> {
}
}
};
+
+ return result;
}
-
+
+
+
public void logged(LogEntry entry) {
- for (PushEventConsumer<? super LogEntry> consumer : consumers) {
- try {
- long status = consumer.accept(PushEvent.data(entry));
- if (status < 0) {
- consumer.accept(PushEvent.close());
- }
- } catch (Exception e) {
+ if (withHistory != null) {
+ historyLock.lock();
+ }
+
+ /*consumer accepts the incoming log entries and returns a back pressure.
+ * A return of zero indicates that event delivery may continue immediately.
+ * A positive return value indicates that the source should delay sending any further events for the requested number of milliseconds.
+ * A return value of -1 indicates that no further events should be sent and that the stream can be closed.
+ * @see org.osgi.util.pushstream.PushEventConsumer<T>
+ */
+ try{
+ for (PushEventConsumer<? super LogEntry> consumer : consumers) {
+ try {
+ long status = consumer.accept(PushEvent.data(entry));
+
+ if (status < 0) {
+ consumer.accept(PushEvent.close());
+ }
+
+ } catch (Exception e) {
// we ignore exceptions here for log stream
+ }
}
}
+ finally{
+ if(withHistory != null){
+ historyLock.unlock();
+ }
+
+ }
}
-
}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
index 3627c7c9b..6953a6cbc 100644
--- a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java
+++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
@@ -1,37 +1,39 @@
+/*******************************************************************************
+ * Copyright (c) 2017 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * IBM Corporation - initial API and implementation
+ *******************************************************************************/
package org.eclipse.equinox.internal.log.stream;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogListener;
import org.osgi.service.log.LogReaderService;
import org.osgi.service.log.stream.LogStreamProvider;
-import org.osgi.util.pushstream.PushEvent;
-import org.osgi.util.pushstream.PushStream;
-import org.osgi.util.pushstream.PushStreamBuilder;
-import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
-public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener {
- private final PushStreamProvider pushStreamProvider = new PushStreamProvider();
- private final Set<LogEntrySource> logEntrySources = new CopyOnWriteArraySet<>();
- private final ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "LogStream thread");
- }
- });
+
+/* LogStreamManager is used to start and stop the bundle and keeps the track of the logs using the
+ * ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>> which listens to
+ * the incoming logs using the LogListener. It is also responsible to provide service tracker
+ * and each log entry to the LogStreamProviderFactory.
+ *
+ */
+public class LogStreamManager implements BundleActivator, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener {
+ private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration;
+ private LogStreamProviderFactory logStreamProviderFactory;
private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
BundleContext context;
ReentrantLock eventProducerLock = new ReentrantLock();
@@ -41,9 +43,14 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider,
* @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
*/
public void start(BundleContext context) throws Exception {
+
this.context = context;
logReaderService = new ServiceTracker<>(context, LogReaderService.class, this);
logReaderService.open();
+
+ logStreamProviderFactory = new LogStreamProviderFactory(logReaderService);
+ logStreamServiceRegistration = context.registerService(LogStreamProvider.class, logStreamProviderFactory, null);
+
}
/*
@@ -52,16 +59,27 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider,
*/
public void stop(BundleContext bundleContext) throws Exception {
logReaderService.close();
- executor.shutdown();
+ logStreamServiceRegistration.unregister();
+ logStreamServiceRegistration = null;
}
+ /*
+ * (non-Javadoc)
+ * @see org.osgi.util.tracker.ServiceTrackerCustomizer#addingService(org.osgi.framework.ServiceReference)
+ */
+
@Override
public AtomicReference<LogReaderService> addingService(ServiceReference<LogReaderService> reference) {
AtomicReference<LogReaderService> tracked = new AtomicReference<>();
modifiedService(reference, tracked);
return tracked;
}
-
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.osgi.util.tracker.ServiceTrackerCustomizer#modifiedService(org.osgi.framework.ServiceReference, java.lang.Object)
+ */
@Override
public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef, AtomicReference<LogReaderService> modifiedTracked) {
eventProducerLock.lock();
@@ -84,7 +102,7 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider,
LogReaderService currentLogReader = currentTracked.get();
if (currentLogReader != null) {
// we were really using this service;
- // remove the our listener and unget the service
+ // remove our listener and unget the service
currentLogReader.removeLogListener(this);
context.ungetService(currentServiceRef);
// finally null out our tracked reference
@@ -92,6 +110,7 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider,
}
}
}
+
readerService.addLogListener(this);
}
}
@@ -99,14 +118,21 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider,
eventProducerLock.unlock();
}
}
-
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.osgi.util.tracker.ServiceTrackerCustomizer#removedService(org.osgi.framework.ServiceReference, java.lang.Object)
+ */
@Override
public void removedService(ServiceReference<LogReaderService> removedRef, AtomicReference<LogReaderService> removedTracked) {
+
eventProducerLock.lock();
try {
} finally {
LogReaderService removedLogReader = removedTracked.get();
if (removedLogReader != null) {
+ // remove the listener
removedLogReader.removeLogListener(this);
context.ungetService(removedRef);
removedTracked.set(null);
@@ -129,29 +155,17 @@ public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider,
}
}
- @Override
- public PushStream<LogEntry> createStream(Options... options) {
- ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
- if (options != null) {
- for (Options option : options) {
- if (Options.HISTORY.equals(option)) {
- withHistory = logReaderService;
- break;
- }
- }
- }
- LogEntrySource logEntrySource = new LogEntrySource(executor, withHistory);
- PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
- PushStream<LogEntry> logStream = streamBuilder.unbuffered().withExecutor(executor).create();
- logEntrySources.add(logEntrySource);
- return logStream;
- }
+
+
+ /* It is used to post each log entry to the LogStreamProviderFactory
+ * (non-Javadoc)
+ * @see org.osgi.service.log.LogListener#logged(org.osgi.service.log.LogEntry)
+ */
@Override
public void logged(LogEntry entry) {
- for (LogEntrySource logEntrySource : logEntrySources) {
- logEntrySource.logged(entry);
- }
+
+ logStreamProviderFactory.postLogEntry(entry);
}
}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
new file mode 100644
index 000000000..00ba1e882
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright (c) 2017 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * IBM Corporation - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.equinox.internal.log.stream;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogReaderService;
+import org.osgi.service.log.stream.LogStreamProvider;
+import org.osgi.util.tracker.ServiceTracker;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+
+public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvider> {
+
+ Map<Bundle, LogStreamProviderImpl> providers = new HashMap<Bundle,LogStreamProviderImpl>() ;
+ ReentrantReadWriteLock eventProducerLock = new ReentrantReadWriteLock();
+ ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
+
+ public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService){
+ this.logReaderService = logReaderService;
+ }
+
+
+ /*Reader of providers map
+ * 1) for each provider
+ * - post entry to provider
+ */
+ public void postLogEntry(LogEntry entry) {
+ eventProducerLock.readLock().lock();
+ try{
+ for(LogStreamProviderImpl provider : providers.values()){
+ provider.logged(entry);
+ }
+ }
+ finally{
+ eventProducerLock.readLock().unlock();
+ }
+
+ }
+
+ /* Writer to providers map
+ * 1) create new LogStreamProviderImpl
+ * 2) put new instance in map
+ * 3) return new instance
+ * (non-Javadoc)
+ * @see org.osgi.framework.ServiceFactory#getService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration)
+ */
+
+ @Override
+ public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration) {
+ LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService);
+ eventProducerLock.writeLock().lock();
+ try{
+ providers.put(bundle, logStreamProviderImpl);
+ return logStreamProviderImpl;
+ }
+ finally{
+ eventProducerLock.writeLock().unlock();
+ }
+ }
+
+ /* 1) Remove the logStreamProviderImpl instance associated with the bundle
+ * 2) close all existing LogStreams from the provider, outside the write lock
+ * (non-Javadoc)
+ * @see org.osgi.framework.ServiceFactory#ungetService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration, java.lang.Object)
+ */
+
+ @Override
+ public void ungetService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration,
+ LogStreamProvider service) {
+
+ LogStreamProviderImpl logStreamProviderImpl;
+
+ eventProducerLock.writeLock().lock();
+ try{
+ logStreamProviderImpl = providers.remove(bundle);
+ }
+ finally{
+ eventProducerLock.writeLock().unlock();
+ }
+
+ logStreamProviderImpl.close();
+
+ }
+
+
+
+
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
new file mode 100644
index 000000000..0ceb63bdd
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
@@ -0,0 +1,117 @@
+/*******************************************************************************
+ * Copyright (c) 2017 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * IBM Corporation - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.equinox.internal.log.stream;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogReaderService;
+import org.osgi.service.log.stream.LogStreamProvider;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamBuilder;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.tracker.ServiceTracker;
+
+public class LogStreamProviderImpl implements LogStreamProvider {
+ private final PushStreamProvider pushStreamProvider = new PushStreamProvider();
+ private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
+ private final WeakHashMap<LogEntrySource,Boolean> weakMap = new WeakHashMap<>();
+ private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap);
+
+ private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
+
+
+
+ public LogStreamProviderImpl(
+ ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
+ this.logReaderService = logReaderService;
+ }
+
+
+ /* Create a PushStream of {@link LogEntry} objects.
+ * The returned PushStream is an unbuffered stream with a parallelism of one.
+ * (non-Javadoc)
+ * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
+ */
+
+ @Override
+ public PushStream<LogEntry> createStream(Options... options) {
+ ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
+ if (options != null) {
+ for (Options option : options) {
+ if (Options.HISTORY.equals(option)) {
+ withHistory = logReaderService;
+ }
+ }
+ }
+
+
+ // A write lock is acquired in order to add logEntrySource into the Set of logEntrySources.
+ historyLock.writeLock().lock();
+ try{
+ LogEntrySource logEntrySource = new LogEntrySource(withHistory);
+ PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
+ //creating an unbuffered stream
+ PushStream<LogEntry> logStream = streamBuilder.unbuffered().create();
+ logEntrySource.setLogStream(logStream);
+ // Adding to sources makes the source start listening for new entries
+ logEntrySources.add(logEntrySource);
+ return logStream;
+ }
+ finally{
+ historyLock.writeLock().unlock();
+ }
+ }
+
+ /*
+ * Send the incoming log entries to the logEntrySource.logged(entry) for the consumer to accept it.
+ */
+ public void logged(LogEntry entry) {
+ historyLock.readLock().lock();
+ try{
+ for (LogEntrySource logEntrySource : logEntrySources) {
+ logEntrySource.logged(entry);
+ }
+ }
+ finally{
+ historyLock.readLock().unlock();
+ }
+ }
+
+
+ /*
+ * Closing the stream for each source.
+ */
+ public void close() {
+ PushStream<LogEntry> logStream;
+ historyLock.readLock().lock();
+ try{
+ for(LogEntrySource logEntrySource : logEntrySources) {
+ logStream = logEntrySource.getLogStream();
+ try {
+ logStream.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }finally{
+ historyLock.readLock().unlock();
+ }
+ }
+
+}

Back to the top