Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java')
-rw-r--r--bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java183
1 files changed, 167 insertions, 16 deletions
diff --git a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java
index 18a62cc11..823ac2453 100644
--- a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java
+++ b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java
@@ -10,12 +10,29 @@ package org.eclipse.osgi.internal.log;
import java.io.PrintStream;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.eclipse.equinox.log.LogFilter;
import org.eclipse.equinox.log.SynchronousLogListener;
import org.eclipse.osgi.framework.util.ArrayMap;
-import org.osgi.framework.*;
-import org.osgi.service.log.*;
+import org.eclipse.osgi.internal.framework.EquinoxContainer;
+import org.eclipse.osgi.internal.log.OrderedExecutor.OrderedTaskQueue;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogLevel;
+import org.osgi.service.log.LogListener;
public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedLogReaderServiceImpl> {
@@ -48,7 +65,7 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
private static PrintStream errorStream;
- private final BasicReadWriteLock listenersLock = new BasicReadWriteLock();
+ private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock();
private ArrayMap<LogListener, Object[]> listeners = new ArrayMap<>(5);
private LogFilter[] filters = null;
private final ThreadLocal<int[]> nestedCallCount = new ThreadLocal<>();
@@ -56,6 +73,8 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
private final int maxHistory;
private final LogLevel defaultLevel;
+ private OrderedExecutor executor;
+
static boolean safeIsLoggable(LogFilter filter, Bundle bundle, String name, int level) {
try {
return filter.isLoggable(bundle, name, level);
@@ -108,6 +127,14 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
}
+ public void start(EquinoxContainer equinoxContainer) {
+ executor = new OrderedExecutor(equinoxContainer);
+ }
+
+ public void stop() {
+ executor.shutdown();
+ }
+
public LogLevel getDefaultLogLevel() {
return defaultLevel;
}
@@ -133,11 +160,11 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
boolean isLoggablePrivileged(Bundle bundle, String name, int level) {
LogFilter[] filtersCopy;
- listenersLock.readLock();
+ listenersLock.readLock().lock();
try {
filtersCopy = filters;
} finally {
- listenersLock.readUnlock();
+ listenersLock.readLock().unlock();
}
try {
if (incrementNestedCount() == MAX_RECURSIONS)
@@ -199,11 +226,11 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
LogEntry logEntry = new ExtendedLogEntryImpl(bundle, name, stackTraceElement, context, logLevelEnum, level, message, exception);
storeEntry(logEntry);
ArrayMap<LogListener, Object[]> listenersCopy;
- listenersLock.readLock();
+ listenersLock.readLock().lock();
try {
listenersCopy = listeners;
} finally {
- listenersLock.readUnlock();
+ listenersLock.readLock().unlock();
}
try {
if (incrementNestedCount() >= MAX_RECURSIONS)
@@ -214,9 +241,9 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
LogFilter filter = (LogFilter) listenerObjects[0];
if (safeIsLoggable(filter, bundle, name, level)) {
LogListener listener = listenersCopy.getKey(i);
- SerializedTaskQueue taskQueue = (SerializedTaskQueue) listenerObjects[1];
- if (taskQueue != null) {
- taskQueue.put(new LogTask(logEntry, listener));
+ OrderedTaskQueue orderedTaskQueue = (OrderedTaskQueue) listenerObjects[1];
+ if (orderedTaskQueue != null) {
+ orderedTaskQueue.execute(new LogTask(logEntry, listener), size);
} else {
// log synchronously
safeLogged(listener, logEntry);
@@ -240,13 +267,13 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
void addLogListener(LogListener listener, LogFilter filter) {
- listenersLock.writeLock();
+ listenersLock.writeLock().lock();
try {
ArrayMap<LogListener, Object[]> listenersCopy = new ArrayMap<>(listeners.getKeys(), listeners.getValues());
Object[] listenerObjects = listenersCopy.get(listener);
if (listenerObjects == null) {
// Only create a task queue for non-SynchronousLogListeners
- SerializedTaskQueue taskQueue = (listener instanceof SynchronousLogListener) ? null : new SerializedTaskQueue(listener.toString());
+ OrderedTaskQueue taskQueue = (listener instanceof SynchronousLogListener) ? null : executor.createQueue();
listenerObjects = new Object[] {filter, taskQueue};
} else if (filter != listenerObjects[0]) {
// update the filter
@@ -256,7 +283,7 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
recalculateFilters(listenersCopy);
listeners = listenersCopy;
} finally {
- listenersLock.writeUnlock();
+ listenersLock.writeLock().unlock();
}
}
@@ -280,14 +307,14 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
void removeLogListener(LogListener listener) {
- listenersLock.writeLock();
+ listenersLock.writeLock().lock();
try {
ArrayMap<LogListener, Object[]> listenersCopy = new ArrayMap<>(listeners.getKeys(), listeners.getValues());
listenersCopy.remove(listener);
recalculateFilters(listenersCopy);
listeners = listenersCopy;
} finally {
- listenersLock.writeUnlock();
+ listenersLock.writeLock().unlock();
}
}
@@ -301,3 +328,127 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
}
+
+/**
+* This Executor uses OrderedTaskQueue to execute tasks in a FIFO order.
+*/
+class OrderedExecutor implements ThreadFactory {
+ private final int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), 10);
+ private final String logThreadName;
+ private final ThreadPoolExecutor delegate;
+ private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ private int coreSize = 0;
+
+ public OrderedExecutor(final EquinoxContainer equinoxContainer) {
+ this.logThreadName = "Equinox Log Thread - " + equinoxContainer.toString(); //$NON-NLS-1$
+ this.delegate = new ThreadPoolExecutor(0, nThreads, 10L, TimeUnit.SECONDS, queue, this);
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, logThreadName);
+ t.setDaemon(true);
+ return t;
+ }
+
+ void executeOrderedTask(Runnable task, OrderedTaskQueue dependencyQueue, int numListeners) {
+ OrderedTaskQueue.OrderedTask firstOrderedTask;
+ synchronized (this) {
+ // This will either queue the task to wait for existing ones to finish
+ // or it will return the first ordered task so it can kick off the execution
+ // for ordered task queue
+ firstOrderedTask = dependencyQueue.addTaskAndReturnIfFirst(task);
+ if (firstOrderedTask != null) {
+ // Check that we are at the optimal target for core pool size
+ int targetSize = Math.min(nThreads, numListeners);
+ if (coreSize < targetSize) {
+ coreSize = targetSize;
+ delegate.setCorePoolSize(coreSize);
+ }
+ }
+ }
+
+ // execute the first ordered task now
+ if (firstOrderedTask != null) {
+ delegate.execute(firstOrderedTask);
+ }
+ }
+
+ OrderedTaskQueue createQueue() {
+ return new OrderedTaskQueue();
+ }
+
+ void shutdown() {
+ delegate.shutdown();
+ }
+
+ void executeNextTask(OrderedTaskQueue taskQueue) {
+ OrderedTaskQueue.OrderedTask nextTask;
+ synchronized (this) {
+ nextTask = taskQueue.getNextTask();
+ if (nextTask == null && queue.isEmpty()) {
+ // The event storm has ended, let the threads be reclaimed
+ delegate.setCorePoolSize(0);
+ coreSize = 0;
+ }
+ }
+ if (nextTask != null) {
+ delegate.execute(nextTask);
+ }
+ }
+
+ /**
+ * Keeps an list of ordered tasks and guarantees the tasks are run in the order
+ * they are queued. Tasks executed with this queue will always be run
+ * in FIFO order and will never run in parallel to guarantee events are
+ * received in the proper order by the listener. Each log listener
+ * has its own ordered task queue.
+ * <p>
+ * Note that only the execute method is thread safe. All other methods
+ * must be guarded by the OrderedExecutor monitor.
+ */
+ class OrderedTaskQueue {
+ private final Queue<OrderedTask> dependencyQueue = new LinkedList<>();
+ private AtomicReference<OrderedTask> firstTask = new AtomicReference<>();
+
+ void execute(Runnable task, int numListeners) {
+ executeOrderedTask(task, this, numListeners);
+ }
+
+ OrderedTask addTaskAndReturnIfFirst(Runnable task) {
+ OrderedTask orderedTask = new OrderedTask(task);
+ if (firstTask.compareAndSet(null, orderedTask)) {
+ return orderedTask;
+ }
+ dependencyQueue.add(orderedTask);
+ return null;
+ }
+
+ OrderedTask getNextTask() {
+ OrderedTask nextTask = dependencyQueue.poll();
+ if (nextTask == null) {
+ // The queue has been drained reset the first task.
+ // the next task for this ordered task queue will become the first again
+ firstTask.set(null);
+ }
+ return nextTask;
+ }
+
+ class OrderedTask implements Runnable {
+ private final Runnable task;
+
+ public OrderedTask(Runnable task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ try {
+ task.run();
+ } finally {
+ executeNextTask(OrderedTaskQueue.this);
+ }
+ }
+ }
+ }
+}

Back to the top