diff options
Diffstat (limited to 'bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java')
-rw-r--r-- | bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java | 183 |
1 files changed, 167 insertions, 16 deletions
diff --git a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java index 18a62cc11..823ac2453 100644 --- a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java +++ b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java @@ -10,12 +10,29 @@ package org.eclipse.osgi.internal.log; import java.io.PrintStream; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.eclipse.equinox.log.LogFilter; import org.eclipse.equinox.log.SynchronousLogListener; import org.eclipse.osgi.framework.util.ArrayMap; -import org.osgi.framework.*; -import org.osgi.service.log.*; +import org.eclipse.osgi.internal.framework.EquinoxContainer; +import org.eclipse.osgi.internal.log.OrderedExecutor.OrderedTaskQueue; +import org.osgi.framework.Bundle; +import org.osgi.framework.ServiceFactory; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.log.LogEntry; +import org.osgi.service.log.LogLevel; +import org.osgi.service.log.LogListener; public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedLogReaderServiceImpl> { @@ -48,7 +65,7 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL private static PrintStream errorStream; - private final BasicReadWriteLock listenersLock = new BasicReadWriteLock(); + private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock(); private ArrayMap<LogListener, Object[]> listeners = new ArrayMap<>(5); private LogFilter[] filters = null; private final ThreadLocal<int[]> nestedCallCount = new ThreadLocal<>(); @@ -56,6 +73,8 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL private final int maxHistory; private final LogLevel defaultLevel; + private OrderedExecutor executor; + static boolean safeIsLoggable(LogFilter filter, Bundle bundle, String name, int level) { try { return filter.isLoggable(bundle, name, level); @@ -108,6 +127,14 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL } } + public void start(EquinoxContainer equinoxContainer) { + executor = new OrderedExecutor(equinoxContainer); + } + + public void stop() { + executor.shutdown(); + } + public LogLevel getDefaultLogLevel() { return defaultLevel; } @@ -133,11 +160,11 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL boolean isLoggablePrivileged(Bundle bundle, String name, int level) { LogFilter[] filtersCopy; - listenersLock.readLock(); + listenersLock.readLock().lock(); try { filtersCopy = filters; } finally { - listenersLock.readUnlock(); + listenersLock.readLock().unlock(); } try { if (incrementNestedCount() == MAX_RECURSIONS) @@ -199,11 +226,11 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL LogEntry logEntry = new ExtendedLogEntryImpl(bundle, name, stackTraceElement, context, logLevelEnum, level, message, exception); storeEntry(logEntry); ArrayMap<LogListener, Object[]> listenersCopy; - listenersLock.readLock(); + listenersLock.readLock().lock(); try { listenersCopy = listeners; } finally { - listenersLock.readUnlock(); + listenersLock.readLock().unlock(); } try { if (incrementNestedCount() >= MAX_RECURSIONS) @@ -214,9 +241,9 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL LogFilter filter = (LogFilter) listenerObjects[0]; if (safeIsLoggable(filter, bundle, name, level)) { LogListener listener = listenersCopy.getKey(i); - SerializedTaskQueue taskQueue = (SerializedTaskQueue) listenerObjects[1]; - if (taskQueue != null) { - taskQueue.put(new LogTask(logEntry, listener)); + OrderedTaskQueue orderedTaskQueue = (OrderedTaskQueue) listenerObjects[1]; + if (orderedTaskQueue != null) { + orderedTaskQueue.execute(new LogTask(logEntry, listener), size); } else { // log synchronously safeLogged(listener, logEntry); @@ -240,13 +267,13 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL } void addLogListener(LogListener listener, LogFilter filter) { - listenersLock.writeLock(); + listenersLock.writeLock().lock(); try { ArrayMap<LogListener, Object[]> listenersCopy = new ArrayMap<>(listeners.getKeys(), listeners.getValues()); Object[] listenerObjects = listenersCopy.get(listener); if (listenerObjects == null) { // Only create a task queue for non-SynchronousLogListeners - SerializedTaskQueue taskQueue = (listener instanceof SynchronousLogListener) ? null : new SerializedTaskQueue(listener.toString()); + OrderedTaskQueue taskQueue = (listener instanceof SynchronousLogListener) ? null : executor.createQueue(); listenerObjects = new Object[] {filter, taskQueue}; } else if (filter != listenerObjects[0]) { // update the filter @@ -256,7 +283,7 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL recalculateFilters(listenersCopy); listeners = listenersCopy; } finally { - listenersLock.writeUnlock(); + listenersLock.writeLock().unlock(); } } @@ -280,14 +307,14 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL } void removeLogListener(LogListener listener) { - listenersLock.writeLock(); + listenersLock.writeLock().lock(); try { ArrayMap<LogListener, Object[]> listenersCopy = new ArrayMap<>(listeners.getKeys(), listeners.getValues()); listenersCopy.remove(listener); recalculateFilters(listenersCopy); listeners = listenersCopy; } finally { - listenersLock.writeUnlock(); + listenersLock.writeLock().unlock(); } } @@ -301,3 +328,127 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL } } + +/** +* This Executor uses OrderedTaskQueue to execute tasks in a FIFO order. +*/ +class OrderedExecutor implements ThreadFactory { + private final int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), 10); + private final String logThreadName; + private final ThreadPoolExecutor delegate; + private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + private int coreSize = 0; + + public OrderedExecutor(final EquinoxContainer equinoxContainer) { + this.logThreadName = "Equinox Log Thread - " + equinoxContainer.toString(); //$NON-NLS-1$ + this.delegate = new ThreadPoolExecutor(0, nThreads, 10L, TimeUnit.SECONDS, queue, this); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, logThreadName); + t.setDaemon(true); + return t; + } + + void executeOrderedTask(Runnable task, OrderedTaskQueue dependencyQueue, int numListeners) { + OrderedTaskQueue.OrderedTask firstOrderedTask; + synchronized (this) { + // This will either queue the task to wait for existing ones to finish + // or it will return the first ordered task so it can kick off the execution + // for ordered task queue + firstOrderedTask = dependencyQueue.addTaskAndReturnIfFirst(task); + if (firstOrderedTask != null) { + // Check that we are at the optimal target for core pool size + int targetSize = Math.min(nThreads, numListeners); + if (coreSize < targetSize) { + coreSize = targetSize; + delegate.setCorePoolSize(coreSize); + } + } + } + + // execute the first ordered task now + if (firstOrderedTask != null) { + delegate.execute(firstOrderedTask); + } + } + + OrderedTaskQueue createQueue() { + return new OrderedTaskQueue(); + } + + void shutdown() { + delegate.shutdown(); + } + + void executeNextTask(OrderedTaskQueue taskQueue) { + OrderedTaskQueue.OrderedTask nextTask; + synchronized (this) { + nextTask = taskQueue.getNextTask(); + if (nextTask == null && queue.isEmpty()) { + // The event storm has ended, let the threads be reclaimed + delegate.setCorePoolSize(0); + coreSize = 0; + } + } + if (nextTask != null) { + delegate.execute(nextTask); + } + } + + /** + * Keeps an list of ordered tasks and guarantees the tasks are run in the order + * they are queued. Tasks executed with this queue will always be run + * in FIFO order and will never run in parallel to guarantee events are + * received in the proper order by the listener. Each log listener + * has its own ordered task queue. + * <p> + * Note that only the execute method is thread safe. All other methods + * must be guarded by the OrderedExecutor monitor. + */ + class OrderedTaskQueue { + private final Queue<OrderedTask> dependencyQueue = new LinkedList<>(); + private AtomicReference<OrderedTask> firstTask = new AtomicReference<>(); + + void execute(Runnable task, int numListeners) { + executeOrderedTask(task, this, numListeners); + } + + OrderedTask addTaskAndReturnIfFirst(Runnable task) { + OrderedTask orderedTask = new OrderedTask(task); + if (firstTask.compareAndSet(null, orderedTask)) { + return orderedTask; + } + dependencyQueue.add(orderedTask); + return null; + } + + OrderedTask getNextTask() { + OrderedTask nextTask = dependencyQueue.poll(); + if (nextTask == null) { + // The queue has been drained reset the first task. + // the next task for this ordered task queue will become the first again + firstTask.set(null); + } + return nextTask; + } + + class OrderedTask implements Runnable { + private final Runnable task; + + public OrderedTask(Runnable task) { + this.task = task; + } + + @Override + public void run() { + try { + task.run(); + } finally { + executeNextTask(OrderedTaskQueue.this); + } + } + } + } +} |