Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnjum Fatima2017-11-30 17:00:26 -0500
committerThomas Watson2017-12-15 11:31:39 -0500
commit90eb309b4bf2228e6012ec868fd38f6bc550ddb3 (patch)
treed88f6aad29f306ead0f5584f3b1fc0f86ff1a71d
parent4228f684eb8e2a17664dd6e3246650f4326be42d (diff)
downloadrt.equinox.framework-90eb309b4bf2228e6012ec868fd38f6bc550ddb3.tar.gz
rt.equinox.framework-90eb309b4bf2228e6012ec868fd38f6bc550ddb3.tar.xz
rt.equinox.framework-90eb309b4bf2228e6012ec868fd38f6bc550ddb3.zip
Bug 528121 - Add executor to control the creation of threads for
multiple log listeners The executor is an ordered executor which warrants the task ordering for tasks with same key. Earlier implementation takes endless time and run out of memory when more listeners are added. Change-Id: I47dfaedaccc4033ebc977a9a53492d28d252d59d Signed-off-by: Anjum Fatima <anjum.eclipse@gmail.com> Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r--bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/ExtendedLogReaderServiceTest.java11
-rw-r--r--bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/TestListener2.java44
-rwxr-xr-xbundles/org.eclipse.osgi.tests/src/org/eclipse/osgi/tests/bundles/SystemBundleTests.java136
-rw-r--r--bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/BasicReadWriteLock.java69
-rw-r--r--bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java183
-rw-r--r--bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/LogServiceManager.java33
-rw-r--r--bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/SerializedTaskQueue.java58
7 files changed, 376 insertions, 158 deletions
diff --git a/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/ExtendedLogReaderServiceTest.java b/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/ExtendedLogReaderServiceTest.java
index 5d4100855..9944cb1a3 100644
--- a/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/ExtendedLogReaderServiceTest.java
+++ b/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/ExtendedLogReaderServiceTest.java
@@ -11,11 +11,18 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import junit.framework.TestCase;
-import org.eclipse.equinox.log.*;
+import org.eclipse.equinox.log.ExtendedLogEntry;
+import org.eclipse.equinox.log.ExtendedLogReaderService;
+import org.eclipse.equinox.log.ExtendedLogService;
+import org.eclipse.equinox.log.LogFilter;
+import org.eclipse.equinox.log.SynchronousLogListener;
import org.eclipse.osgi.tests.OSGiTestsActivator;
import org.osgi.framework.Bundle;
import org.osgi.framework.ServiceReference;
-import org.osgi.service.log.*;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogLevel;
+import org.osgi.service.log.LogListener;
+import org.osgi.service.log.LogService;
import org.osgi.service.log.Logger;
import org.osgi.service.log.admin.LoggerAdmin;
import org.osgi.service.log.admin.LoggerContext;
diff --git a/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/TestListener2.java b/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/TestListener2.java
new file mode 100644
index 000000000..63fb33791
--- /dev/null
+++ b/bundles/org.eclipse.osgi.tests/src/org/eclipse/equinox/log/test/TestListener2.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Copyright (c) 2006, 2017 Cognos Incorporated, IBM Corporation and others
+ * All rights reserved. This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ ******************************************************************************/
+package org.eclipse.equinox.log.test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.TestCase;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogListener;
+
+public class TestListener2 extends TestCase implements LogListener {
+ private List<String> list;
+ private AtomicInteger flag = new AtomicInteger(0);
+ CountDownLatch latch;
+
+ public TestListener2(CountDownLatch countDownLatch) {
+ this.list = Collections.synchronizedList(new ArrayList());
+ this.latch = countDownLatch;
+ }
+
+ @Override
+ public void logged(LogEntry entry) {
+ //logged is never called in parallel. Added a check to see if two threads are accessing the logged method at the same time.
+ assertTrue(flag.compareAndSet(0, 1));
+ if (entry.getBundle().getSymbolicName().equals("org.eclipse.osgi")) {
+ list.add(entry.getMessage());
+ latch.countDown();
+ }
+ flag.set(0);
+ }
+
+ public List<String> getLogs() {
+ return this.list;
+ }
+
+}
diff --git a/bundles/org.eclipse.osgi.tests/src/org/eclipse/osgi/tests/bundles/SystemBundleTests.java b/bundles/org.eclipse.osgi.tests/src/org/eclipse/osgi/tests/bundles/SystemBundleTests.java
index 40f6d98a8..3dc4f7ed0 100755
--- a/bundles/org.eclipse.osgi.tests/src/org/eclipse/osgi/tests/bundles/SystemBundleTests.java
+++ b/bundles/org.eclipse.osgi.tests/src/org/eclipse/osgi/tests/bundles/SystemBundleTests.java
@@ -10,18 +10,54 @@
*******************************************************************************/
package org.eclipse.osgi.tests.bundles;
-import java.io.*;
-import java.net.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLConnection;
import java.security.Permission;
import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.jar.*;
+import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
import javax.net.SocketFactory;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.eclipse.core.runtime.adaptor.EclipseStarter;
+import org.eclipse.equinox.log.ExtendedLogReaderService;
+import org.eclipse.equinox.log.ExtendedLogService;
+import org.eclipse.equinox.log.test.TestListener2;
import org.eclipse.osgi.framework.util.FilePath;
import org.eclipse.osgi.internal.framework.EquinoxConfiguration;
import org.eclipse.osgi.internal.location.EquinoxLocations;
@@ -32,7 +68,17 @@ import org.eclipse.osgi.storage.url.reference.Handler;
import org.eclipse.osgi.tests.OSGiTestsActivator;
import org.eclipse.osgi.tests.security.BaseSecurityTest;
import org.junit.Assert;
-import org.osgi.framework.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.BundleListener;
+import org.osgi.framework.Constants;
+import org.osgi.framework.FrameworkEvent;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.SynchronousBundleListener;
import org.osgi.framework.hooks.resolver.ResolverHook;
import org.osgi.framework.hooks.resolver.ResolverHookFactory;
import org.osgi.framework.hooks.weaving.WeavingHook;
@@ -40,14 +86,22 @@ import org.osgi.framework.hooks.weaving.WovenClass;
import org.osgi.framework.launch.Framework;
import org.osgi.framework.launch.FrameworkFactory;
import org.osgi.framework.namespace.NativeNamespace;
-import org.osgi.framework.wiring.*;
+import org.osgi.framework.wiring.BundleCapability;
+import org.osgi.framework.wiring.BundleRequirement;
+import org.osgi.framework.wiring.BundleRevision;
+import org.osgi.framework.wiring.BundleWiring;
+import org.osgi.framework.wiring.FrameworkWiring;
import org.osgi.resource.Capability;
import org.osgi.resource.Requirement;
-import org.osgi.service.log.*;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogLevel;
+import org.osgi.service.log.LogReaderService;
import org.osgi.service.packageadmin.ExportedPackage;
import org.osgi.service.packageadmin.PackageAdmin;
import org.osgi.service.startlevel.StartLevel;
-import org.osgi.service.url.*;
+import org.osgi.service.url.AbstractURLStreamHandlerService;
+import org.osgi.service.url.URLConstants;
+import org.osgi.service.url.URLStreamHandlerService;
public class SystemBundleTests extends AbstractBundleTests {
public static Test suite() {
@@ -3133,6 +3187,70 @@ public class SystemBundleTests extends AbstractBundleTests {
}
}
+ public void testLogOrderMultipleListeners() throws InterruptedException {
+ File config = OSGiTestsActivator.getContext().getDataFile(getName()); //$NON-NLS-1$
+ Map configuration = new HashMap();
+ configuration.put(Constants.FRAMEWORK_STORAGE, config.getAbsolutePath());
+ Equinox equinox = null;
+
+ try {
+ equinox = new Equinox(configuration);
+ equinox.start();
+ doLoggingOnMultipleListeners(equinox);
+ equinox.stop();
+ equinox.waitForStop(1000);
+ equinox.start();
+ doLoggingOnMultipleListeners(equinox);
+
+ } catch (BundleException e) {
+ fail("Failed init", e);
+ } finally {
+ try {
+ if (equinox != null) {
+ equinox.stop();
+ equinox.waitForStop(1000);
+ }
+ } catch (BundleException e) {
+ fail("Failed to stop framework.", e);
+ } catch (InterruptedException e) {
+ fail("Failed to stop framework.", e);
+ }
+ }
+ }
+
+ public static void doLoggingOnMultipleListeners(Equinox equinox) throws InterruptedException {
+ int listenersSize = 100;
+ int logSize = 10000;
+ BundleContext bc = equinox.getBundleContext();
+ ExtendedLogReaderService logReader = bc.getService(bc.getServiceReference(ExtendedLogReaderService.class));
+ ExtendedLogService log = bc.getService(bc.getServiceReference(ExtendedLogService.class));
+ ArrayList<TestListener2> listeners = new ArrayList<TestListener2>();
+ CountDownLatch latch = new CountDownLatch(logSize * listenersSize);
+
+ for (int i = 0; i < listenersSize; i++) {
+ TestListener2 listener = new TestListener2(latch);
+ listeners.add(listener);
+ logReader.addLogListener(listener);
+ }
+
+ for (int i = 0; i < logSize; i++) {
+ log.warn(String.valueOf(i));
+ }
+
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals("Failed to log all entries", 0, latch.getCount());
+
+ int expected = 0;
+ for (String msg : listeners.get(0).getLogs()) {
+ assertEquals("Unexpected log found.", expected, Integer.parseInt(msg));
+ expected++;
+ }
+
+ for (int i = 1; i < listenersSize; i++) {
+ assertTrue(listeners.get(i).getLogs().equals(listeners.get(0).getLogs()));
+ }
+ }
+
public void testSystemCapabilitiesBug522125() throws URISyntaxException, FileNotFoundException, IOException, BundleException, InterruptedException {
String frameworkLocation = OSGiTestsActivator.getContext().getProperty(EquinoxConfiguration.PROP_FRAMEWORK);
URI uri = new URI(frameworkLocation);
diff --git a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/BasicReadWriteLock.java b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/BasicReadWriteLock.java
deleted file mode 100644
index b402f58bb..000000000
--- a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/BasicReadWriteLock.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2008, 2016 IBM Corporation and others
- * All rights reserved. This program and the accompanying materials are made
- * available under the terms of the Eclipse Public License v1.0 which
- * accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- ******************************************************************************/
-package org.eclipse.osgi.internal.log;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class BasicReadWriteLock {
- private List<Thread> currentReaders = new ArrayList<>(2);
- private int writersWaiting = 0;
- private Thread writing = null;
-
- public synchronized int readLock() {
- while (writing != null || writersWaiting != 0) {
- try {
- if (writing == Thread.currentThread())
- throw new IllegalStateException("Attempted to nest read lock inside a write lock"); //$NON-NLS-1$
- wait();
- } catch (InterruptedException e) {
- // reset interrupted state but keep waiting
- Thread.currentThread().interrupt();
- }
- }
- currentReaders.add(Thread.currentThread());
- if (currentReaders.size() == 1)
- return 1;
- Thread current = Thread.currentThread();
- int result = 0;
- for (Thread reader : currentReaders) {
- if (reader == current)
- result++;
- }
- return result;
- }
-
- public synchronized void readUnlock() {
- currentReaders.remove(Thread.currentThread());
- notifyAll();
- }
-
- public synchronized void writeLock() {
- writersWaiting++;
- try {
- while (writing != null || currentReaders.size() != 0) {
- try {
- if (writing == Thread.currentThread() || currentReaders.contains(Thread.currentThread()))
- throw new IllegalStateException("Attempted to nest write lock inside a read or write lock"); //$NON-NLS-1$
- wait();
- } catch (InterruptedException e) {
- // reset interrupted state but keep waiting
- Thread.currentThread().interrupt();
- }
- }
- } finally {
- writersWaiting--;
- }
- writing = Thread.currentThread();
- }
-
- public synchronized void writeUnlock() {
- writing = null;
- notifyAll();
- }
-}
diff --git a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java
index 18a62cc11..823ac2453 100644
--- a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java
+++ b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/ExtendedLogReaderServiceFactory.java
@@ -10,12 +10,29 @@ package org.eclipse.osgi.internal.log;
import java.io.PrintStream;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.eclipse.equinox.log.LogFilter;
import org.eclipse.equinox.log.SynchronousLogListener;
import org.eclipse.osgi.framework.util.ArrayMap;
-import org.osgi.framework.*;
-import org.osgi.service.log.*;
+import org.eclipse.osgi.internal.framework.EquinoxContainer;
+import org.eclipse.osgi.internal.log.OrderedExecutor.OrderedTaskQueue;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.log.LogEntry;
+import org.osgi.service.log.LogLevel;
+import org.osgi.service.log.LogListener;
public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedLogReaderServiceImpl> {
@@ -48,7 +65,7 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
private static PrintStream errorStream;
- private final BasicReadWriteLock listenersLock = new BasicReadWriteLock();
+ private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock();
private ArrayMap<LogListener, Object[]> listeners = new ArrayMap<>(5);
private LogFilter[] filters = null;
private final ThreadLocal<int[]> nestedCallCount = new ThreadLocal<>();
@@ -56,6 +73,8 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
private final int maxHistory;
private final LogLevel defaultLevel;
+ private OrderedExecutor executor;
+
static boolean safeIsLoggable(LogFilter filter, Bundle bundle, String name, int level) {
try {
return filter.isLoggable(bundle, name, level);
@@ -108,6 +127,14 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
}
+ public void start(EquinoxContainer equinoxContainer) {
+ executor = new OrderedExecutor(equinoxContainer);
+ }
+
+ public void stop() {
+ executor.shutdown();
+ }
+
public LogLevel getDefaultLogLevel() {
return defaultLevel;
}
@@ -133,11 +160,11 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
boolean isLoggablePrivileged(Bundle bundle, String name, int level) {
LogFilter[] filtersCopy;
- listenersLock.readLock();
+ listenersLock.readLock().lock();
try {
filtersCopy = filters;
} finally {
- listenersLock.readUnlock();
+ listenersLock.readLock().unlock();
}
try {
if (incrementNestedCount() == MAX_RECURSIONS)
@@ -199,11 +226,11 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
LogEntry logEntry = new ExtendedLogEntryImpl(bundle, name, stackTraceElement, context, logLevelEnum, level, message, exception);
storeEntry(logEntry);
ArrayMap<LogListener, Object[]> listenersCopy;
- listenersLock.readLock();
+ listenersLock.readLock().lock();
try {
listenersCopy = listeners;
} finally {
- listenersLock.readUnlock();
+ listenersLock.readLock().unlock();
}
try {
if (incrementNestedCount() >= MAX_RECURSIONS)
@@ -214,9 +241,9 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
LogFilter filter = (LogFilter) listenerObjects[0];
if (safeIsLoggable(filter, bundle, name, level)) {
LogListener listener = listenersCopy.getKey(i);
- SerializedTaskQueue taskQueue = (SerializedTaskQueue) listenerObjects[1];
- if (taskQueue != null) {
- taskQueue.put(new LogTask(logEntry, listener));
+ OrderedTaskQueue orderedTaskQueue = (OrderedTaskQueue) listenerObjects[1];
+ if (orderedTaskQueue != null) {
+ orderedTaskQueue.execute(new LogTask(logEntry, listener), size);
} else {
// log synchronously
safeLogged(listener, logEntry);
@@ -240,13 +267,13 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
void addLogListener(LogListener listener, LogFilter filter) {
- listenersLock.writeLock();
+ listenersLock.writeLock().lock();
try {
ArrayMap<LogListener, Object[]> listenersCopy = new ArrayMap<>(listeners.getKeys(), listeners.getValues());
Object[] listenerObjects = listenersCopy.get(listener);
if (listenerObjects == null) {
// Only create a task queue for non-SynchronousLogListeners
- SerializedTaskQueue taskQueue = (listener instanceof SynchronousLogListener) ? null : new SerializedTaskQueue(listener.toString());
+ OrderedTaskQueue taskQueue = (listener instanceof SynchronousLogListener) ? null : executor.createQueue();
listenerObjects = new Object[] {filter, taskQueue};
} else if (filter != listenerObjects[0]) {
// update the filter
@@ -256,7 +283,7 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
recalculateFilters(listenersCopy);
listeners = listenersCopy;
} finally {
- listenersLock.writeUnlock();
+ listenersLock.writeLock().unlock();
}
}
@@ -280,14 +307,14 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
void removeLogListener(LogListener listener) {
- listenersLock.writeLock();
+ listenersLock.writeLock().lock();
try {
ArrayMap<LogListener, Object[]> listenersCopy = new ArrayMap<>(listeners.getKeys(), listeners.getValues());
listenersCopy.remove(listener);
recalculateFilters(listenersCopy);
listeners = listenersCopy;
} finally {
- listenersLock.writeUnlock();
+ listenersLock.writeLock().unlock();
}
}
@@ -301,3 +328,127 @@ public class ExtendedLogReaderServiceFactory implements ServiceFactory<ExtendedL
}
}
+
+/**
+* This Executor uses OrderedTaskQueue to execute tasks in a FIFO order.
+*/
+class OrderedExecutor implements ThreadFactory {
+ private final int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), 10);
+ private final String logThreadName;
+ private final ThreadPoolExecutor delegate;
+ private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ private int coreSize = 0;
+
+ public OrderedExecutor(final EquinoxContainer equinoxContainer) {
+ this.logThreadName = "Equinox Log Thread - " + equinoxContainer.toString(); //$NON-NLS-1$
+ this.delegate = new ThreadPoolExecutor(0, nThreads, 10L, TimeUnit.SECONDS, queue, this);
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, logThreadName);
+ t.setDaemon(true);
+ return t;
+ }
+
+ void executeOrderedTask(Runnable task, OrderedTaskQueue dependencyQueue, int numListeners) {
+ OrderedTaskQueue.OrderedTask firstOrderedTask;
+ synchronized (this) {
+ // This will either queue the task to wait for existing ones to finish
+ // or it will return the first ordered task so it can kick off the execution
+ // for ordered task queue
+ firstOrderedTask = dependencyQueue.addTaskAndReturnIfFirst(task);
+ if (firstOrderedTask != null) {
+ // Check that we are at the optimal target for core pool size
+ int targetSize = Math.min(nThreads, numListeners);
+ if (coreSize < targetSize) {
+ coreSize = targetSize;
+ delegate.setCorePoolSize(coreSize);
+ }
+ }
+ }
+
+ // execute the first ordered task now
+ if (firstOrderedTask != null) {
+ delegate.execute(firstOrderedTask);
+ }
+ }
+
+ OrderedTaskQueue createQueue() {
+ return new OrderedTaskQueue();
+ }
+
+ void shutdown() {
+ delegate.shutdown();
+ }
+
+ void executeNextTask(OrderedTaskQueue taskQueue) {
+ OrderedTaskQueue.OrderedTask nextTask;
+ synchronized (this) {
+ nextTask = taskQueue.getNextTask();
+ if (nextTask == null && queue.isEmpty()) {
+ // The event storm has ended, let the threads be reclaimed
+ delegate.setCorePoolSize(0);
+ coreSize = 0;
+ }
+ }
+ if (nextTask != null) {
+ delegate.execute(nextTask);
+ }
+ }
+
+ /**
+ * Keeps an list of ordered tasks and guarantees the tasks are run in the order
+ * they are queued. Tasks executed with this queue will always be run
+ * in FIFO order and will never run in parallel to guarantee events are
+ * received in the proper order by the listener. Each log listener
+ * has its own ordered task queue.
+ * <p>
+ * Note that only the execute method is thread safe. All other methods
+ * must be guarded by the OrderedExecutor monitor.
+ */
+ class OrderedTaskQueue {
+ private final Queue<OrderedTask> dependencyQueue = new LinkedList<>();
+ private AtomicReference<OrderedTask> firstTask = new AtomicReference<>();
+
+ void execute(Runnable task, int numListeners) {
+ executeOrderedTask(task, this, numListeners);
+ }
+
+ OrderedTask addTaskAndReturnIfFirst(Runnable task) {
+ OrderedTask orderedTask = new OrderedTask(task);
+ if (firstTask.compareAndSet(null, orderedTask)) {
+ return orderedTask;
+ }
+ dependencyQueue.add(orderedTask);
+ return null;
+ }
+
+ OrderedTask getNextTask() {
+ OrderedTask nextTask = dependencyQueue.poll();
+ if (nextTask == null) {
+ // The queue has been drained reset the first task.
+ // the next task for this ordered task queue will become the first again
+ firstTask.set(null);
+ }
+ return nextTask;
+ }
+
+ class OrderedTask implements Runnable {
+ private final Runnable task;
+
+ public OrderedTask(Runnable task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ try {
+ task.run();
+ } finally {
+ executeNextTask(OrderedTaskQueue.this);
+ }
+ }
+ }
+ }
+}
diff --git a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/LogServiceManager.java b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/LogServiceManager.java
index 788a5fcd6..c9b7579af 100644
--- a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/LogServiceManager.java
+++ b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/LogServiceManager.java
@@ -11,11 +11,34 @@ import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.security.cert.X509Certificate;
-import java.util.*;
-import org.eclipse.equinox.log.*;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import org.eclipse.equinox.log.ExtendedLogReaderService;
+import org.eclipse.equinox.log.ExtendedLogService;
+import org.eclipse.equinox.log.LogFilter;
+import org.eclipse.osgi.internal.framework.BundleContextImpl;
import org.eclipse.osgi.internal.framework.EquinoxContainer;
-import org.osgi.framework.*;
-import org.osgi.service.log.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.BundleListener;
+import org.osgi.framework.Constants;
+import org.osgi.framework.FrameworkEvent;
+import org.osgi.framework.FrameworkListener;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceListener;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.Version;
+import org.osgi.service.log.LogLevel;
+import org.osgi.service.log.LogListener;
+import org.osgi.service.log.LogReaderService;
+import org.osgi.service.log.LogService;
+import org.osgi.service.log.LoggerFactory;
import org.osgi.service.log.admin.LoggerAdmin;
public class LogServiceManager implements BundleListener, FrameworkListener, ServiceListener {
@@ -49,6 +72,7 @@ public class LogServiceManager implements BundleListener, FrameworkListener, Ser
}
public void start(BundleContext context) {
+ logReaderServiceFactory.start(((BundleContextImpl) context).getContainer());
systemBundleLog.setBundle(context.getBundle());
context.addBundleListener(this);
context.addServiceListener(this);
@@ -83,6 +107,7 @@ public class LogServiceManager implements BundleListener, FrameworkListener, Ser
context.removeFrameworkListener(this);
context.removeServiceListener(this);
context.removeBundleListener(this);
+ logReaderServiceFactory.stop();
}
public ExtendedLogService getSystemBundleLog() {
diff --git a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/SerializedTaskQueue.java b/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/SerializedTaskQueue.java
deleted file mode 100644
index 4226421a9..000000000
--- a/bundles/org.eclipse.osgi/container/src/org/eclipse/osgi/internal/log/SerializedTaskQueue.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2006, 2016 Cognos Incorporated, IBM Corporation and others
- * All rights reserved. This program and the accompanying materials are made
- * available under the terms of the Eclipse Public License v1.0 which
- * accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- ******************************************************************************/
-package org.eclipse.osgi.internal.log;
-
-import java.util.LinkedList;
-
-/**
- * SerializedTaskQueue is a utility class that will allow asynchronous but serialized execution of tasks
- */
-public class SerializedTaskQueue {
-
- private static final int MAX_WAIT = 5000;
- private final LinkedList<Runnable> tasks = new LinkedList<>();
- private Thread thread;
- private final String queueName;
-
- public SerializedTaskQueue(String queueName) {
- this.queueName = queueName;
- }
-
- public synchronized void put(Runnable newTask) {
- tasks.add(newTask);
- if (thread == null) {
- thread = new Thread(queueName) {
- public void run() {
- Runnable task = nextTask(MAX_WAIT);
- while (task != null) {
- task.run();
- task = nextTask(MAX_WAIT);
- }
- }
- };
- thread.start();
- } else
- notify();
- }
-
- synchronized Runnable nextTask(int maxWait) {
- if (tasks.isEmpty()) {
- try {
- wait(maxWait);
- } catch (InterruptedException e) {
- // ignore -- we control the stack here and do not need to propagate it.
- }
-
- if (tasks.isEmpty()) {
- thread = null;
- return null;
- }
- }
- return tasks.removeFirst();
- }
-}

Back to the top