Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java')
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java108
1 files changed, 108 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
new file mode 100644
index 000000000..e32bf225e
--- /dev/null
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Copyright (c) 2017 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * IBM Corporation - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.equinox.internal.log.stream;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogReaderService;
+import org.osgi.service.log.stream.LogStreamProvider;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamBuilder;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.tracker.ServiceTracker;
+
+public class LogStreamProviderImpl implements LogStreamProvider {
+ private final PushStreamProvider pushStreamProvider = new PushStreamProvider();
+ private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
+ private final WeakHashMap<LogEntrySource, Boolean> weakMap = new WeakHashMap<>();
+ private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap);
+
+ private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
+
+ public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
+ this.logReaderService = logReaderService;
+ }
+
+ /* Create a PushStream of {@link LogEntry} objects.
+ * The returned PushStream is an unbuffered stream with a parallelism of one.
+ * (non-Javadoc)
+ * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
+ */
+
+ @Override
+ public PushStream<LogEntry> createStream(Options... options) {
+ ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
+ if (options != null) {
+ for (Options option : options) {
+ if (Options.HISTORY.equals(option)) {
+ withHistory = logReaderService;
+ }
+ }
+ }
+
+ // A write lock is acquired in order to add logEntrySource into the Set of logEntrySources.
+ historyLock.writeLock().lock();
+ try {
+ LogEntrySource logEntrySource = new LogEntrySource(withHistory);
+ PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
+ //creating an unbuffered stream
+ PushStream<LogEntry> logStream = streamBuilder.unbuffered().create();
+ logEntrySource.setLogStream(logStream);
+ // Adding to sources makes the source start listening for new entries
+ logEntrySources.add(logEntrySource);
+ return logStream;
+ } finally {
+ historyLock.writeLock().unlock();
+ }
+ }
+
+ /*
+ * Send the incoming log entries to the logEntrySource.logged(entry) for the consumer to accept it.
+ */
+ public void logged(LogEntry entry) {
+ historyLock.readLock().lock();
+ try {
+ for (LogEntrySource logEntrySource : logEntrySources) {
+ logEntrySource.logged(entry);
+ }
+ } finally {
+ historyLock.readLock().unlock();
+ }
+ }
+
+ /*
+ * Closing the stream for each source.
+ */
+ public void close() {
+ PushStream<LogEntry> logStream;
+ historyLock.readLock().lock();
+ try {
+ for (LogEntrySource logEntrySource : logEntrySources) {
+ logStream = logEntrySource.getLogStream();
+ try {
+ logStream.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ historyLock.readLock().unlock();
+ }
+ }
+
+}

Back to the top