Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFrancois Chouinard2012-11-30 19:46:26 +0000
committerFrancois Chouinard2012-11-30 22:12:51 +0000
commit6ccf56714250f70c83d1f12078d2c5eb2648475a (patch)
tree2d421249df00ac8856a2fc8479a73220f2d47699
parent15d0249714b54fed528bbb6f539e5b234c523824 (diff)
downloadorg.eclipse.linuxtools-6ccf56714250f70c83d1f12078d2c5eb2648475a.tar.gz
org.eclipse.linuxtools-6ccf56714250f70c83d1f12078d2c5eb2648475a.tar.xz
org.eclipse.linuxtools-6ccf56714250f70c83d1f12078d2c5eb2648475a.zip
Make lower priority requests pre-emptible
The current request priority handling is based on splitting lower priority requests into chunks of a fixed size that don't take too long to process. If a higher priority request is issued (e.g. as the result of a user action), it is simply put in front of the request queue i.e. before the next low-priority chunk. This patch modifies the TmfRequestExecutor to use 2 queues (one for each priority) and implements a pre-emption scheme to make higher priority requests run immediately. This is the first step in a more comprehensive overhaul of the TMF Request Model. Change-Id: I6413cbbd69985d88b3fd5b5375a0b7ec59104682 Signed-off-by: Francois Chouinard <fchouinard@gmail.com> Reviewed-on: https://git.eclipse.org/r/8983 Tested-by: Hudson CI
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java235
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java245
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java50
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java168
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java161
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java98
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java2
-rw-r--r--lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java2
8 files changed, 549 insertions, 412 deletions
diff --git a/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java b/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java
index 4ed9e99150..bd6ef51b3c 100644
--- a/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java
+++ b/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java
@@ -1,34 +1,42 @@
/*******************************************************************************
- * Copyright (c) 2009, 2010 Ericsson
- *
+ * Copyright (c) 2009, 2010, 2012 Ericsson
+ *
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
- *
+ *
* Contributors:
* Francois Chouinard - Initial API and implementation
*******************************************************************************/
package org.eclipse.linuxtools.tmf.core.tests.request;
-import java.util.concurrent.Executors;
-
import junit.framework.TestCase;
+import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
+import org.eclipse.linuxtools.tmf.core.component.TmfDataProvider;
+import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
+import org.eclipse.linuxtools.tmf.core.event.TmfEvent;
+import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
+import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
+import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
+import org.eclipse.linuxtools.tmf.core.signal.TmfSignal;
+import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
+import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
/**
- * <b><u>TmfRequestExecutorTest</u></b>
- *
* Test suite for the TmfRequestExecutor class.
*/
@SuppressWarnings({ "nls" })
public class TmfRequestExecutorTest extends TestCase {
- // ------------------------------------------------------------------------
- // Variables
- // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Variables
+ // ------------------------------------------------------------------------
+
+ private TmfRequestExecutor fExecutor;
// ------------------------------------------------------------------------
// Housekeeping
@@ -41,15 +49,18 @@ public class TmfRequestExecutorTest extends TestCase {
super(name);
}
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- }
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fExecutor = new TmfRequestExecutor();
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ fExecutor.stop();
+ }
// ------------------------------------------------------------------------
// Constructors
@@ -60,17 +71,6 @@ public class TmfRequestExecutorTest extends TestCase {
*/
public void testTmfRequestExecutor() {
TmfRequestExecutor executor = new TmfRequestExecutor();
- assertEquals("nbPendingRequests", 0, executor.getNbPendingRequests());
- assertFalse("isShutdown", executor.isShutdown());
- assertFalse("isTerminated", executor.isTerminated());
- }
-
- /**
- * Test method for {@link org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor#TmfRequestExecutor(java.util.concurrent.ExecutorService)}.
- */
- public void testTmfRequestExecutorExecutorService() {
- TmfRequestExecutor executor = new TmfRequestExecutor(Executors.newCachedThreadPool());
- assertEquals("nbPendingRequests", 0, executor.getNbPendingRequests());
assertFalse("isShutdown", executor.isShutdown());
assertFalse("isTerminated", executor.isTerminated());
}
@@ -81,7 +81,6 @@ public class TmfRequestExecutorTest extends TestCase {
public void testStop() {
TmfRequestExecutor executor = new TmfRequestExecutor();
executor.stop();
- assertEquals("nbPendingRequests", 0, executor.getNbPendingRequests());
assertTrue("isShutdown", executor.isShutdown());
assertTrue("isTerminated", executor.isTerminated());
}
@@ -90,12 +89,172 @@ public class TmfRequestExecutorTest extends TestCase {
// execute
// ------------------------------------------------------------------------
- /**
+ // Dummy context
+ private static class MyContext implements ITmfContext {
+ private long fNbRequested;
+ private long fRank;
+
+ public MyContext(long requested) {
+ fNbRequested = requested;
+ fRank = 0;
+ }
+ @Override
+ public long getRank() {
+ return (fRank <= fNbRequested) ? fRank : -1;
+ }
+ @Override
+ public ITmfLocation getLocation() {
+ return null;
+ }
+ @Override
+ public boolean hasValidRank() {
+ return true;
+ }
+ @Override
+ public void setLocation(ITmfLocation location) {
+ }
+ @Override
+ public void setRank(long rank) {
+ fRank = rank;
+ }
+ @Override
+ public void increaseRank() {
+ fRank++;
+ }
+ @Override
+ public void dispose() {
+ }
+ @Override
+ public MyContext clone() {
+ return this;
+ }
+ }
+
+ // Dummy provider
+ private static class MyProvider extends TmfDataProvider {
+ private ITmfEvent fEvent = new TmfEvent();
+
+ @Override
+ public String getName() {
+ return null;
+ }
+ @Override
+ public void dispose() {
+ }
+ @Override
+ public void broadcast(TmfSignal signal) {
+ }
+ @Override
+ public void sendRequest(ITmfDataRequest request) {
+ }
+ @Override
+ public void fireRequest() {
+ }
+ @Override
+ public void notifyPendingRequest(boolean isIncrement) {
+ }
+ @Override
+ public ITmfEvent getNext(ITmfContext context) {
+ context.increaseRank();
+ return context.getRank() >= 0 ? fEvent : null;
+ }
+ @Override
+ public ITmfContext armRequest(ITmfDataRequest request) {
+ return new MyContext(request.getNbRequested());
+ }
+ }
+
+ // Dummy request
+ private static class MyRequest extends TmfDataRequest {
+ public MyRequest(ExecutionType priority, int requested) {
+ super(ITmfEvent.class, 0, requested, priority);
+ }
+ @Override
+ public void done() {
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+ }
+
+ // Dummy thread
+ private static class MyThread extends TmfEventThread {
+ public MyThread(TmfDataProvider provider, ITmfDataRequest request) {
+ super(provider, request);
+ }
+ }
+
+ private final static Object monitor = new Object();
+
+ /**
* Test method for {@link org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor#execute(java.lang.Runnable)}.
*/
public void testExecute() {
-// fail("Not yet implemented");
- }
+ MyProvider provider = new MyProvider();
+ MyRequest request1 = new MyRequest(ExecutionType.BACKGROUND, Integer.MAX_VALUE / 5);
+ MyThread thread1 = new MyThread(provider, request1);
+ MyRequest request2 = new MyRequest(ExecutionType.FOREGROUND, Integer.MAX_VALUE / 10);
+ MyThread thread2 = new MyThread(provider, request2);
+ MyRequest request3 = new MyRequest(ExecutionType.FOREGROUND, Integer.MAX_VALUE / 10);
+ MyThread thread3 = new MyThread(provider, request3);
+
+ // Start thread1
+ fExecutor.execute(thread1);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ assertTrue("isRunning", thread1.isRunning());
+
+ // Start higher priority thread2
+ fExecutor.execute(thread2);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ assertFalse("isRunning", thread1.isRunning());
+ assertTrue("isRunning", thread2.isRunning());
+
+ // Wait for end of thread2
+ try {
+ synchronized (monitor) {
+ monitor.wait();
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ }
+ assertTrue("isCompleted", thread2.isCompleted());
+ assertTrue("isRunning", thread1.isRunning());
+
+ // Start higher priority thread3
+ fExecutor.execute(thread3);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ assertFalse("isRunning", thread1.isRunning());
+ assertTrue("isRunning", thread3.isRunning());
+
+ // Wait for end of thread3
+ try {
+ synchronized (monitor) {
+ monitor.wait();
+ Thread.sleep(500);
+ }
+ } catch (InterruptedException e) {
+ }
+ assertTrue("isCompleted", thread3.isCompleted());
+ assertTrue("isRunning", thread1.isRunning());
+
+ // Wait for thread1 completion
+ try {
+ synchronized (monitor) {
+ monitor.wait();
+ }
+ } catch (InterruptedException e) {
+ }
+ assertTrue("isCompleted", thread1.isCompleted());
+ }
// ------------------------------------------------------------------------
// toString
@@ -105,13 +264,9 @@ public class TmfRequestExecutorTest extends TestCase {
* Test method for {@link org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor#toString()}.
*/
public void testToString() {
-// TmfRequestExecutor executor1 = new TmfRequestExecutor();
-// String expected1 = "[TmfRequestExecutor(DelegatedExecutorService)]";
-// assertEquals("toString", expected1, executor1.toString());
-//
-// TmfRequestExecutor executor2 = new TmfRequestExecutor(Executors.newCachedThreadPool());
-// String expected2 = "[TmfRequestExecutor(ThreadPoolExecutor)]";
-// assertEquals("toString", expected2, executor2.toString());
+ TmfRequestExecutor executor = new TmfRequestExecutor();
+ String expected = "[TmfRequestExecutor(ThreadPoolExecutor)]";
+ assertEquals("toString", expected, executor.toString());
}
}
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java
new file mode 100644
index 0000000000..97c95cbdc6
--- /dev/null
+++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java
@@ -0,0 +1,245 @@
+/*******************************************************************************
+ * Copyright (c) 2012 Ericsson
+ *
+ * All rights reserved. This program and the accompanying materials are
+ * made available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Francois Chouinard - Initial API and implementation
+ *******************************************************************************/
+
+package org.eclipse.linuxtools.internal.tmf.core.component;
+
+import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
+import org.eclipse.linuxtools.tmf.core.component.ITmfDataProvider;
+import org.eclipse.linuxtools.tmf.core.component.TmfDataProvider;
+import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
+import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
+import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
+import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
+
+/**
+ * Provides the core event request processor. It also has support for suspending
+ * and resuming a request in a thread-safe manner.
+ *
+ * @author Francois Chouinard
+ * @version 1.0
+ */
+public class TmfEventThread implements Runnable {
+
+ // ------------------------------------------------------------------------
+ // Attributes
+ // ------------------------------------------------------------------------
+
+ /**
+ * The event provider
+ */
+ private final TmfDataProvider fProvider;
+
+ /**
+ * The wrapped event request
+ */
+ private final ITmfDataRequest fRequest;
+
+ /**
+ * The request execution priority
+ */
+ private final ExecutionType fExecType;
+
+ /**
+ * The wrapped thread (if applicable)
+ */
+ private final TmfEventThread fThread;
+
+ /**
+ * The thread execution state
+ */
+ private volatile boolean isPaused = false;
+ private volatile boolean isCompleted = false;
+
+ /**
+ * The synchronization object
+ */
+ private final Object object = new Object();
+
+ // ------------------------------------------------------------------------
+ // Constructor
+ // ------------------------------------------------------------------------
+
+ /**
+ * Basic constructor
+ *
+ * @param provider the event provider
+ * @param request the request to process
+ */
+ public TmfEventThread(TmfDataProvider provider, ITmfDataRequest request) {
+ assert provider != null;
+ assert request != null;
+ fProvider = provider;
+ fRequest = request;
+ fExecType = request.getExecType();
+ fThread = null;
+ }
+
+ /**
+ * Wrapper constructor
+ *
+ * @param thread the thread to wrap
+ */
+ public TmfEventThread(TmfEventThread thread) {
+ fProvider = thread.fProvider;
+ fRequest = thread.fRequest;
+ fExecType = thread.fExecType;
+ fThread = thread;
+ }
+
+ // ------------------------------------------------------------------------
+ // Getters
+ // ------------------------------------------------------------------------
+
+ /**
+ * @return The wrapped thread
+ */
+ public TmfEventThread getThread() {
+ return fThread;
+ }
+
+ /**
+ * @return The event provider
+ */
+ public ITmfDataProvider getProvider() {
+ return fProvider;
+ }
+
+ /**
+ * @return The event request
+ */
+ public ITmfDataRequest getRequest() {
+ return fRequest;
+ }
+
+ /**
+ * @return The request execution priority
+ */
+ public ExecutionType getExecType() {
+ return fExecType;
+ }
+
+ /**
+ * @return The request execution state
+ */
+ public boolean isRunning() {
+ return fRequest.isRunning() && !isPaused;
+ }
+
+ /**
+ * @return The request execution state
+ */
+ public boolean isCompleted() {
+ return isCompleted;
+ }
+
+ // ------------------------------------------------------------------------
+ // Runnable
+ // ------------------------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+
+ TmfCoreTracer.traceRequest(fRequest, "is being serviced by " + fProvider.getName()); //$NON-NLS-1$
+
+ // Extract the generic information
+ fRequest.start();
+ int nbRequested = fRequest.getNbRequested();
+ int nbRead = 0;
+ isCompleted = false;
+
+ // Initialize the execution
+ ITmfContext context = fProvider.armRequest(fRequest);
+ if (context == null) {
+ fRequest.cancel();
+ return;
+ }
+
+ try {
+ // Get the ordered events
+ ITmfEvent event = fProvider.getNext(context);
+ TmfCoreTracer.traceRequest(fRequest, "read first event"); //$NON-NLS-1$
+
+ while (event != null && !fProvider.isCompleted(fRequest, event, nbRead)) {
+ if (isPaused) {
+ try {
+ while (isPaused) {
+ synchronized (object) {
+ object.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ TmfCoreTracer.traceEvent(fProvider, fRequest, event);
+ if (fRequest.getDataType().isInstance(event)) {
+ fRequest.handleData(event);
+ }
+
+ // To avoid an unnecessary read passed the last event requested
+ if (++nbRead < nbRequested) {
+ event = fProvider.getNext(context);
+ }
+ }
+
+ isCompleted = true;
+
+ if (fRequest.isCancelled()) {
+ fRequest.cancel();
+ } else {
+ fRequest.done();
+ }
+
+ } catch (Exception e) {
+ fRequest.fail();
+ }
+
+ // Cleanup
+ context.dispose();
+ }
+
+ // ------------------------------------------------------------------------
+ // Operations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Suspend the thread
+ */
+ public synchronized void suspend() {
+ isPaused = true;
+ TmfCoreTracer.traceRequest(fRequest, "SUSPENDED"); //$NON-NLS-1$
+ }
+
+ /**
+ * Resume the thread
+ */
+ public synchronized void resume() {
+ isPaused = false;
+ synchronized (object) {
+ object.notifyAll();
+ }
+ TmfCoreTracer.traceRequest(fRequest, "RESUMED"); //$NON-NLS-1$
+ }
+
+ /**
+ * Cancel the request
+ */
+ public void cancel() {
+ if (!fRequest.isCompleted()) {
+ fRequest.cancel();
+ }
+ }
+
+}
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java
deleted file mode 100644
index 9453bf23ed..0000000000
--- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2010 Ericsson
- *
- * All rights reserved. This program and the accompanying materials are
- * made available under the terms of the Eclipse Public License v1.0 which
- * accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Francois Chouinard - Initial API and implementation
- *******************************************************************************/
-
-package org.eclipse.linuxtools.internal.tmf.core.component;
-
-import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
-
-/**
- * Utility class to add an execution class to a simple Java thread
- *
- * @version 1.0
- * @author Francois Chouinard
- */
-public class TmfThread extends Thread {
-
- private final ExecutionType fExecType;
-
- /**
- * Standard constructor
- *
- * @param execType
- * The data request's ExecutionType
- */
- public TmfThread(ExecutionType execType) {
- fExecType = execType;
- }
-
- /**
- * @return The Execution type
- */
- public ExecutionType getExecType() {
- return fExecType;
- }
-
- /**
- * Cancel this request
- */
- public void cancel() {
- }
-
-}
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java
index 09e8716822..2be6143934 100644
--- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java
+++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2009, 2010 Ericsson
+ * Copyright (c) 2009, 2010, 2012 Ericsson
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v1.0 which
@@ -8,44 +8,44 @@
*
* Contributors:
* Francois Chouinard - Initial API and implementation
+ * Francois Chouinard - Added support for pre-emption
*******************************************************************************/
package org.eclipse.linuxtools.internal.tmf.core.request;
-import java.util.Comparator;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
-import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
+import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
/**
* A simple, straightforward request executor.
*
- * @version 1.0
* @author Francois Chouinard
+ * @version 1.1
*/
public class TmfRequestExecutor implements Executor {
- private final ExecutorService fExecutor;
+ // ------------------------------------------------------------------------
+ // Attributes
+ // ------------------------------------------------------------------------
+
+ // The request executor
+ private final ExecutorService fExecutor = Executors.newFixedThreadPool(2);
private final String fExecutorName;
- private final PriorityBlockingQueue<TmfThread> fRequestQueue = new PriorityBlockingQueue<TmfThread>(
- 100, new Comparator<TmfThread>() {
- @Override
- public int compare(TmfThread o1, TmfThread o2) {
- if (o1.getExecType() == o2.getExecType()) {
- return 0;
- }
- if (o1.getExecType() == ExecutionType.BACKGROUND) {
- return 1;
- }
- return -1;
- }
- });
- private TmfThread fCurrentRequest;
+
+ // The request queues
+ private final Queue<TmfEventThread> fHighPriorityTasks = new ArrayBlockingQueue<TmfEventThread>(100);
+ private final Queue<TmfEventThread> fLowPriorityTasks = new ArrayBlockingQueue<TmfEventThread>(100);
+
+ // The tasks
+ private TmfEventThread fActiveTask;
+ private TmfEventThread fSuspendedTask;
// ------------------------------------------------------------------------
// Constructors
@@ -55,7 +55,11 @@ public class TmfRequestExecutor implements Executor {
* Default constructor
*/
public TmfRequestExecutor() {
- this(Executors.newSingleThreadExecutor());
+ String canonicalName = fExecutor.getClass().getCanonicalName();
+ fExecutorName = canonicalName.substring(canonicalName.lastIndexOf('.') + 1);
+ if (TmfCoreTracer.isComponentTraced()) {
+ TmfCoreTracer.trace(fExecutor + " created"); //$NON-NLS-1$
+ }
}
/**
@@ -63,21 +67,21 @@ public class TmfRequestExecutor implements Executor {
*
* @param executor The executor service to use
*/
+ @Deprecated
public TmfRequestExecutor(ExecutorService executor) {
- fExecutor = executor;
- String canonicalName = fExecutor.getClass().getCanonicalName();
- fExecutorName = canonicalName.substring(canonicalName.lastIndexOf('.') + 1);
- if (TmfCoreTracer.isComponentTraced())
- {
- TmfCoreTracer.trace(fExecutor + " created"); //$NON-NLS-1$
- }
+ this();
}
+ // ------------------------------------------------------------------------
+ // Getters
+ // ------------------------------------------------------------------------
+
/**
* @return the number of pending requests
*/
+ @Deprecated
public synchronized int getNbPendingRequests() {
- return fRequestQueue.size();
+ return fHighPriorityTasks.size() + fLowPriorityTasks.size();
}
/**
@@ -94,64 +98,84 @@ public class TmfRequestExecutor implements Executor {
return fExecutor.isTerminated();
}
- /**
- * Stops the executor
- */
- public synchronized void stop() {
- if (fCurrentRequest != null) {
- fCurrentRequest.cancel();
- }
-
- while ((fCurrentRequest = fRequestQueue.poll()) != null) {
- fCurrentRequest.cancel();
- }
-
- fExecutor.shutdown();
- if (TmfCoreTracer.isComponentTraced())
- {
- TmfCoreTracer.trace(fExecutor + " terminated"); //$NON-NLS-1$
- }
- }
-
- // ------------------------------------------------------------------------
- // Operations
- // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Operations
+ // ------------------------------------------------------------------------
/* (non-Javadoc)
* @see java.util.concurrent.Executor#execute(java.lang.Runnable)
*/
@Override
- public synchronized void execute(final Runnable requestThread) {
- fRequestQueue.offer(new TmfThread(((TmfThread) requestThread).getExecType()) {
- @Override
- public void run() {
- try {
- requestThread.run();
- } finally {
- scheduleNext();
- }
- }
+ public synchronized void execute(final Runnable command) {
+
+ // We are expecting MyEventThread:s
+ if (!(command instanceof TmfEventThread)) {
+ // TODO: Log an error
+ return;
+ }
+
+ // Wrap the thread in a MyThread
+ TmfEventThread thread = (TmfEventThread) command;
+ TmfEventThread wrapper = new TmfEventThread(thread) {
@Override
- public void cancel() {
- ((TmfThread) requestThread).cancel();
+ public void run() {
+ try {
+ command.run();
+ } finally {
+ scheduleNext();
+ }
}
- });
- if (fCurrentRequest == null) {
- scheduleNext();
- }
+ };
+
+ // Add the thread to the appropriate queue
+ ExecutionType priority = thread.getExecType();
+ (priority == ExecutionType.FOREGROUND ? fHighPriorityTasks : fLowPriorityTasks).offer(wrapper);
+
+ // Schedule or preempt as appropriate
+ if (fActiveTask == null) {
+ scheduleNext();
+ } else if (priority == ExecutionType.FOREGROUND && priority != fActiveTask.getExecType()) {
+ fActiveTask.getThread().suspend();
+ fSuspendedTask = fActiveTask;
+ scheduleNext();
+ }
}
/**
* Executes the next pending request, if applicable.
*/
protected synchronized void scheduleNext() {
- if ((fCurrentRequest = fRequestQueue.poll()) != null) {
- if (!isShutdown()) {
- fExecutor.execute(fCurrentRequest);
- }
- }
+ if (!isShutdown()) {
+ if ((fActiveTask = fHighPriorityTasks.poll()) != null) {
+ fExecutor.execute(fActiveTask);
+ } else if (fSuspendedTask != null) {
+ fActiveTask = fSuspendedTask;
+ fSuspendedTask = null;
+ fActiveTask.getThread().resume();
+ } else if ((fActiveTask = fLowPriorityTasks.poll()) != null) {
+ fExecutor.execute(fActiveTask);
+ }
+ }
}
+ /**
+ * Stops the executor
+ */
+ public synchronized void stop() {
+ if (fActiveTask != null) {
+ fActiveTask.cancel();
+ }
+
+ while ((fActiveTask = fHighPriorityTasks.poll()) != null) {
+ fActiveTask.cancel();
+ }
+
+ fExecutor.shutdown();
+ if (TmfCoreTracer.isComponentTraced()) {
+ TmfCoreTracer.trace(fExecutor + " terminated"); //$NON-NLS-1$
+ }
+ }
+
// ------------------------------------------------------------------------
// Object
// ------------------------------------------------------------------------
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java
index 0341ef56c5..b24624dfaf 100644
--- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java
+++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java
@@ -8,6 +8,7 @@
*
* Contributors:
* Francois Chouinard - Initial API and implementation
+ * Francois Chouinard - Replace background requests by pre-emptable requests
*******************************************************************************/
package org.eclipse.linuxtools.tmf.core.component;
@@ -18,8 +19,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
+import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
-import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
@@ -40,10 +41,9 @@ import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
* The concrete class can either re-implement processRequest() entirely or just
* implement the hooks (initializeContext() and getNext()).
* <p>
- * TODO: Add support for providing multiple data types.
*
- * @version 1.0
* @author Francois Chouinard
+ * @version 1.1
*/
public abstract class TmfDataProvider extends TmfComponent implements ITmfDataProvider {
@@ -321,78 +321,12 @@ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataPr
return;
}
- final TmfDataProvider provider = this;
-
- // Process the request
- TmfThread thread = new TmfThread(request.getExecType()) {
-
- @Override
- public void run() {
-
- if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
- }
-
- // Extract the generic information
- request.start();
- int nbRequested = request.getNbRequested();
- int nbRead = 0;
-
- // Initialize the execution
- ITmfContext context = armRequest(request);
- if (context == null) {
- request.cancel();
- return;
- }
-
- try {
- // Get the ordered events
- ITmfEvent data = getNext(context);
- if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "read first event"); //$NON-NLS-1$
- }
- while (data != null && !isCompleted(request, data, nbRead)) {
- if (fLogData) {
- TmfCoreTracer.traceEvent(provider, request, data);
- }
- if (request.getDataType().isInstance(data)) {
- request.handleData(data);
- }
-
- // To avoid an unnecessary read passed the last data
- // requested
- if (++nbRead < nbRequested) {
- data = getNext(context);
- }
- }
- if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
- }
-
- if (request.isCancelled()) {
- request.cancel();
- } else {
- request.done();
- }
- } catch (Exception e) {
- request.fail();
- }
-
- // Cleanup
- context.dispose();
- }
-
- @Override
- public void cancel() {
- if (!request.isCompleted()) {
- request.cancel();
- }
- }
- };
+ TmfEventThread thread = new TmfEventThread(this, request);
if (TmfCoreTracer.isRequestTraced()) {
TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
}
+
fExecutor.execute(thread);
}
@@ -406,87 +340,8 @@ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataPr
* @param indexing
* Should we index the chunks
*/
- protected void queueBackgroundRequest(final ITmfDataRequest request,
- final int blockSize, final boolean indexing) {
-
- final TmfDataProvider provider = this;
-
- Thread thread = new Thread() {
- @Override
- public void run() {
-
- if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
- }
-
- request.start();
-
- final Integer[] CHUNK_SIZE = new Integer[1];
- CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
-
- final Integer[] nbRead = new Integer[1];
- nbRead[0] = 0;
-
- final Boolean[] isFinished = new Boolean[1];
- isFinished[0] = Boolean.FALSE;
-
- while (!isFinished[0]) {
-
- TmfDataRequest subRequest = new TmfDataRequest(request.getDataType(), request.getIndex()
- + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
-
- @Override
- public synchronized boolean isCompleted() {
- return super.isCompleted() || request.isCompleted();
- }
-
- @Override
- public void handleData(ITmfEvent data) {
- super.handleData(data);
- if (request.getDataType().isInstance(data)) {
- request.handleData(data);
- }
- if (getNbRead() > CHUNK_SIZE[0]) {
- System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
- }
- }
-
- @Override
- public void handleCompleted() {
- nbRead[0] += getNbRead();
- if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
- if (this.isCancelled()) {
- request.cancel();
- } else if (this.isFailed()) {
- request.fail();
- } else {
- request.done();
- }
- isFinished[0] = Boolean.TRUE;
- }
- super.handleCompleted();
- }
- };
-
- if (!isFinished[0]) {
- queueRequest(subRequest);
-
- try {
- subRequest.waitForCompletion();
- if (request.isCompleted()) {
- isFinished[0] = Boolean.TRUE;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
- }
- }
- }
- };
-
- thread.start();
+ protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) {
+ queueRequest(request);
}
/**
@@ -498,7 +353,7 @@ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataPr
* @return Sn application specific context; null if request can't be
* serviced
*/
- protected abstract ITmfContext armRequest(ITmfDataRequest request);
+ public abstract ITmfContext armRequest(ITmfDataRequest request);
// /**
// * Return the next event based on the context supplied. The context
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java
index 12ce5b59ec..cb54d2e2f4 100644
--- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java
+++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2009, 2010 Ericsson
+ * Copyright (c) 2009, 2010, 2012 Ericsson
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v1.0 which
@@ -8,6 +8,7 @@
*
* Contributors:
* Francois Chouinard - Initial API and implementation
+ * Francois Chouinard - Replace background requests by pre-emptable requests
*******************************************************************************/
package org.eclipse.linuxtools.tmf.core.component;
@@ -17,15 +18,13 @@ import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest
import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
-import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
-import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
/**
* An extension of TmfDataProvider timestamped events providers.
*
- * @version 1.0
* @author Francois Chouinard
+ * @version 1.1
*/
public abstract class TmfEventProvider extends TmfDataProvider {
@@ -112,95 +111,4 @@ public abstract class TmfEventProvider extends TmfDataProvider {
}
}
- @Override
- protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) {
-
- if (! (request instanceof ITmfEventRequest)) {
- super.queueBackgroundRequest(request, blockSize, indexing);
- return;
- }
-
- final TmfDataProvider provider = this;
-
- Thread thread = new Thread() {
- @Override
- public void run() {
-
- if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
- }
-
- request.start();
-
- final Integer[] CHUNK_SIZE = new Integer[1];
- CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
-
- final Integer[] nbRead = new Integer[1];
- nbRead[0] = 0;
-
- final Boolean[] isFinished = new Boolean[1];
- isFinished[0] = Boolean.FALSE;
-
- long startIndex = request.getIndex();
-
- while (!isFinished[0]) {
-
- TmfEventRequest subRequest= new TmfEventRequest(request.getDataType(), ((ITmfEventRequest) request).getRange(), startIndex + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
-
- @Override
- public synchronized boolean isCompleted() {
- return super.isCompleted() || request.isCompleted();
- }
-
- @Override
- public void handleData(ITmfEvent data) {
- super.handleData(data);
- if (request.getDataType().isInstance(data)) {
- request.handleData(data);
- }
- if (this.getNbRead() > CHUNK_SIZE[0]) {
- System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
- }
- }
-
- @Override
- public void handleCompleted() {
- nbRead[0] += this.getNbRead();
- if (nbRead[0] >= request.getNbRequested() || (this.getNbRead() < CHUNK_SIZE[0])) {
- if (this.isCancelled()) {
- request.cancel();
- } else if (this.isFailed()) {
- request.fail();
- } else {
- request.done();
- }
- isFinished[0] = Boolean.TRUE;
- }
- super.handleCompleted();
- }
- };
-
- if (!isFinished[0]) {
- queueRequest(subRequest);
-
- try {
- subRequest.waitForCompletion();
- if (request.isCompleted()) {
- isFinished[0] = Boolean.TRUE;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- if (startIndex == 0 && nbRead[0].equals(CHUNK_SIZE[0])) { // do this only once if the event request index is unknown
- startIndex = subRequest.getIndex(); // update the start index with the index of the first subrequest's
- } // start time event which was set during the arm request
- CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
- }
- }
- }
- };
-
- thread.start();
- }
}
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java
index eda777efdd..fa9c948338 100644
--- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java
+++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java
@@ -202,7 +202,7 @@ public class TmfExperiment extends TmfTrace implements ITmfEventParser {
* @see org.eclipse.linuxtools.tmf.core.trace.TmfTrace#armRequest(org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest)
*/
@Override
- protected synchronized ITmfContext armRequest(final ITmfDataRequest request) {
+ public synchronized ITmfContext armRequest(final ITmfDataRequest request) {
// Make sure we have something to read from
if (fTraces == null) {
diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java
index 6f776496de..e59637fb1f 100644
--- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java
+++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java
@@ -674,7 +674,7 @@ public abstract class TmfTrace extends TmfEventProvider implements ITmfTrace {
* @see org.eclipse.linuxtools.tmf.core.component.TmfDataProvider#armRequest(org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest)
*/
@Override
- protected synchronized ITmfContext armRequest(final ITmfDataRequest request) {
+ public synchronized ITmfContext armRequest(final ITmfDataRequest request) {
if (executorIsShutdown()) {
return null;
}

Back to the top