Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 0706ef1b99c18d657b9bd6b1aca4c4c5d1a9a6c1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*******************************************************************************
 * Copyright (c) 2017 IBM Corporation and others.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 * 
 * Contributors:
 *     IBM Corporation - initial API and implementation
 *******************************************************************************/
package org.eclipse.equinox.internal.log.stream;

import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.osgi.service.log.LogEntry;
import org.osgi.service.log.LogReaderService;
import org.osgi.service.log.stream.LogStreamProvider;
import org.osgi.util.pushstream.PushEvent;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamBuilder;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.QueuePolicyOption;
import org.osgi.util.tracker.ServiceTracker;

public class LogStreamProviderImpl implements LogStreamProvider {
	private final PushStreamProvider pushStreamProvider = new PushStreamProvider();
	private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
	private final WeakHashMap<LogEntrySource, Boolean> weakMap = new WeakHashMap<>();
	private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap);

	private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
	private final ExecutorService executor;

	public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService, ExecutorService executor) {
		this.logReaderService = logReaderService;
		this.executor = executor;
	}

	/* Create a PushStream of {@link LogEntry} objects.
	 * The returned PushStream is 
	 * Buffered with a buffer large enough to contain the history, if included.
	 * Have the QueuePolicyOption.DISCARD_OLDEST queue policy option.
	 * Use a shared executor.
	 * Have a parallelism of one.
	 * (non-Javadoc)
	 * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
	 */
	@Override
	public PushStream<LogEntry> createStream(Options... options) {
		ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
		if (options != null) {
			for (Options option : options) {
				if (Options.HISTORY.equals(option)) {
					withHistory = logReaderService;
				}
			}
		}

		// A write lock is acquired in order to add logEntrySource into the Set of logEntrySources.
		historyLock.writeLock().lock();
		try {
			LogEntrySource logEntrySource = new LogEntrySource(withHistory);
			PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
			//creating a buffered push stream
			LinkedBlockingQueue<PushEvent<? extends LogEntry>> historyQueue = new LinkedBlockingQueue<>();
			PushStream<LogEntry> logStream = streamBuilder.withBuffer(historyQueue).withExecutor(executor).withParallelism(1).withQueuePolicy(QueuePolicyOption.DISCARD_OLDEST).build();
			logEntrySource.setLogStream(logStream);
			// Adding to sources makes the source start listening for new entries
			logEntrySources.add(logEntrySource);
			return logStream;
		} finally {
			historyLock.writeLock().unlock();
		}
	}

	/*
	 * Send the incoming log entries to the logEntrySource.logged(entry) for the consumer to accept it.
	 */
	public void logged(LogEntry entry) {
		historyLock.readLock().lock();
		try {
			for (LogEntrySource logEntrySource : logEntrySources) {
				logEntrySource.logged(entry);
			}
		} finally {
			historyLock.readLock().unlock();
		}
	}

	/*
	 * Closing the stream for each source.
	 */
	public void close() {
		PushStream<LogEntry> logStream;
		historyLock.readLock().lock();
		try {
			for (LogEntrySource logEntrySource : logEntrySources) {
				logStream = logEntrySource.getLogStream();
				try {
					logStream.close();

				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		} finally {
			historyLock.readLock().unlock();
		}
	}

}

Back to the top