diff options
author | Francois Chouinard | 2012-11-30 19:46:26 +0000 |
---|---|---|
committer | Francois Chouinard | 2012-11-30 22:12:51 +0000 |
commit | 6ccf56714250f70c83d1f12078d2c5eb2648475a (patch) | |
tree | 2d421249df00ac8856a2fc8479a73220f2d47699 | |
parent | 15d0249714b54fed528bbb6f539e5b234c523824 (diff) | |
download | org.eclipse.linuxtools-6ccf56714250f70c83d1f12078d2c5eb2648475a.tar.gz org.eclipse.linuxtools-6ccf56714250f70c83d1f12078d2c5eb2648475a.tar.xz org.eclipse.linuxtools-6ccf56714250f70c83d1f12078d2c5eb2648475a.zip |
Make lower priority requests pre-emptible
The current request priority handling is based on splitting lower
priority requests into chunks of a fixed size that don't take too long
to process. If a higher priority request is issued (e.g. as the result
of a user action), it is simply put in front of the request queue i.e.
before the next low-priority chunk.
This patch modifies the TmfRequestExecutor to use 2 queues (one for each
priority) and implements a pre-emption scheme to make higher priority
requests run immediately.
This is the first step in a more comprehensive overhaul of the TMF
Request Model.
Change-Id: I6413cbbd69985d88b3fd5b5375a0b7ec59104682
Signed-off-by: Francois Chouinard <fchouinard@gmail.com>
Reviewed-on: https://git.eclipse.org/r/8983
Tested-by: Hudson CI
8 files changed, 549 insertions, 412 deletions
diff --git a/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java b/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java index 4ed9e99150..bd6ef51b3c 100644 --- a/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java +++ b/lttng/org.eclipse.linuxtools.tmf.core.tests/src/org/eclipse/linuxtools/tmf/core/tests/request/TmfRequestExecutorTest.java @@ -1,34 +1,42 @@ /******************************************************************************* - * Copyright (c) 2009, 2010 Ericsson - * + * Copyright (c) 2009, 2010, 2012 Ericsson + * * All rights reserved. This program and the accompanying materials are * made available under the terms of the Eclipse Public License v1.0 which * accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html - * + * * Contributors: * Francois Chouinard - Initial API and implementation *******************************************************************************/ package org.eclipse.linuxtools.tmf.core.tests.request; -import java.util.concurrent.Executors; - import junit.framework.TestCase; +import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; +import org.eclipse.linuxtools.tmf.core.component.TmfDataProvider; +import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; +import org.eclipse.linuxtools.tmf.core.event.TmfEvent; +import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest; +import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType; +import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest; +import org.eclipse.linuxtools.tmf.core.signal.TmfSignal; +import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; +import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation; /** - * <b><u>TmfRequestExecutorTest</u></b> - * * Test suite for the TmfRequestExecutor class. */ @SuppressWarnings({ "nls" }) public class TmfRequestExecutorTest extends TestCase { - // ------------------------------------------------------------------------ - // Variables - // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // Variables + // ------------------------------------------------------------------------ + + private TmfRequestExecutor fExecutor; // ------------------------------------------------------------------------ // Housekeeping @@ -41,15 +49,18 @@ public class TmfRequestExecutorTest extends TestCase { super(name); } - @Override - protected void setUp() throws Exception { - super.setUp(); - } + @Override + protected void setUp() throws Exception { + super.setUp(); + fExecutor = new TmfRequestExecutor(); - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + fExecutor.stop(); + } // ------------------------------------------------------------------------ // Constructors @@ -60,17 +71,6 @@ public class TmfRequestExecutorTest extends TestCase { */ public void testTmfRequestExecutor() { TmfRequestExecutor executor = new TmfRequestExecutor(); - assertEquals("nbPendingRequests", 0, executor.getNbPendingRequests()); - assertFalse("isShutdown", executor.isShutdown()); - assertFalse("isTerminated", executor.isTerminated()); - } - - /** - * Test method for {@link org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor#TmfRequestExecutor(java.util.concurrent.ExecutorService)}. - */ - public void testTmfRequestExecutorExecutorService() { - TmfRequestExecutor executor = new TmfRequestExecutor(Executors.newCachedThreadPool()); - assertEquals("nbPendingRequests", 0, executor.getNbPendingRequests()); assertFalse("isShutdown", executor.isShutdown()); assertFalse("isTerminated", executor.isTerminated()); } @@ -81,7 +81,6 @@ public class TmfRequestExecutorTest extends TestCase { public void testStop() { TmfRequestExecutor executor = new TmfRequestExecutor(); executor.stop(); - assertEquals("nbPendingRequests", 0, executor.getNbPendingRequests()); assertTrue("isShutdown", executor.isShutdown()); assertTrue("isTerminated", executor.isTerminated()); } @@ -90,12 +89,172 @@ public class TmfRequestExecutorTest extends TestCase { // execute // ------------------------------------------------------------------------ - /** + // Dummy context + private static class MyContext implements ITmfContext { + private long fNbRequested; + private long fRank; + + public MyContext(long requested) { + fNbRequested = requested; + fRank = 0; + } + @Override + public long getRank() { + return (fRank <= fNbRequested) ? fRank : -1; + } + @Override + public ITmfLocation getLocation() { + return null; + } + @Override + public boolean hasValidRank() { + return true; + } + @Override + public void setLocation(ITmfLocation location) { + } + @Override + public void setRank(long rank) { + fRank = rank; + } + @Override + public void increaseRank() { + fRank++; + } + @Override + public void dispose() { + } + @Override + public MyContext clone() { + return this; + } + } + + // Dummy provider + private static class MyProvider extends TmfDataProvider { + private ITmfEvent fEvent = new TmfEvent(); + + @Override + public String getName() { + return null; + } + @Override + public void dispose() { + } + @Override + public void broadcast(TmfSignal signal) { + } + @Override + public void sendRequest(ITmfDataRequest request) { + } + @Override + public void fireRequest() { + } + @Override + public void notifyPendingRequest(boolean isIncrement) { + } + @Override + public ITmfEvent getNext(ITmfContext context) { + context.increaseRank(); + return context.getRank() >= 0 ? fEvent : null; + } + @Override + public ITmfContext armRequest(ITmfDataRequest request) { + return new MyContext(request.getNbRequested()); + } + } + + // Dummy request + private static class MyRequest extends TmfDataRequest { + public MyRequest(ExecutionType priority, int requested) { + super(ITmfEvent.class, 0, requested, priority); + } + @Override + public void done() { + synchronized (monitor) { + monitor.notifyAll(); + } + } + } + + // Dummy thread + private static class MyThread extends TmfEventThread { + public MyThread(TmfDataProvider provider, ITmfDataRequest request) { + super(provider, request); + } + } + + private final static Object monitor = new Object(); + + /** * Test method for {@link org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor#execute(java.lang.Runnable)}. */ public void testExecute() { -// fail("Not yet implemented"); - } + MyProvider provider = new MyProvider(); + MyRequest request1 = new MyRequest(ExecutionType.BACKGROUND, Integer.MAX_VALUE / 5); + MyThread thread1 = new MyThread(provider, request1); + MyRequest request2 = new MyRequest(ExecutionType.FOREGROUND, Integer.MAX_VALUE / 10); + MyThread thread2 = new MyThread(provider, request2); + MyRequest request3 = new MyRequest(ExecutionType.FOREGROUND, Integer.MAX_VALUE / 10); + MyThread thread3 = new MyThread(provider, request3); + + // Start thread1 + fExecutor.execute(thread1); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + assertTrue("isRunning", thread1.isRunning()); + + // Start higher priority thread2 + fExecutor.execute(thread2); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + assertFalse("isRunning", thread1.isRunning()); + assertTrue("isRunning", thread2.isRunning()); + + // Wait for end of thread2 + try { + synchronized (monitor) { + monitor.wait(); + Thread.sleep(1000); + } + } catch (InterruptedException e) { + } + assertTrue("isCompleted", thread2.isCompleted()); + assertTrue("isRunning", thread1.isRunning()); + + // Start higher priority thread3 + fExecutor.execute(thread3); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + assertFalse("isRunning", thread1.isRunning()); + assertTrue("isRunning", thread3.isRunning()); + + // Wait for end of thread3 + try { + synchronized (monitor) { + monitor.wait(); + Thread.sleep(500); + } + } catch (InterruptedException e) { + } + assertTrue("isCompleted", thread3.isCompleted()); + assertTrue("isRunning", thread1.isRunning()); + + // Wait for thread1 completion + try { + synchronized (monitor) { + monitor.wait(); + } + } catch (InterruptedException e) { + } + assertTrue("isCompleted", thread1.isCompleted()); + } // ------------------------------------------------------------------------ // toString @@ -105,13 +264,9 @@ public class TmfRequestExecutorTest extends TestCase { * Test method for {@link org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor#toString()}. */ public void testToString() { -// TmfRequestExecutor executor1 = new TmfRequestExecutor(); -// String expected1 = "[TmfRequestExecutor(DelegatedExecutorService)]"; -// assertEquals("toString", expected1, executor1.toString()); -// -// TmfRequestExecutor executor2 = new TmfRequestExecutor(Executors.newCachedThreadPool()); -// String expected2 = "[TmfRequestExecutor(ThreadPoolExecutor)]"; -// assertEquals("toString", expected2, executor2.toString()); + TmfRequestExecutor executor = new TmfRequestExecutor(); + String expected = "[TmfRequestExecutor(ThreadPoolExecutor)]"; + assertEquals("toString", expected, executor.toString()); } } diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java new file mode 100644 index 0000000000..97c95cbdc6 --- /dev/null +++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfEventThread.java @@ -0,0 +1,245 @@ +/******************************************************************************* + * Copyright (c) 2012 Ericsson + * + * All rights reserved. This program and the accompanying materials are + * made available under the terms of the Eclipse Public License v1.0 which + * accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Francois Chouinard - Initial API and implementation + *******************************************************************************/ + +package org.eclipse.linuxtools.internal.tmf.core.component; + +import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; +import org.eclipse.linuxtools.tmf.core.component.ITmfDataProvider; +import org.eclipse.linuxtools.tmf.core.component.TmfDataProvider; +import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; +import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest; +import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType; +import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; + +/** + * Provides the core event request processor. It also has support for suspending + * and resuming a request in a thread-safe manner. + * + * @author Francois Chouinard + * @version 1.0 + */ +public class TmfEventThread implements Runnable { + + // ------------------------------------------------------------------------ + // Attributes + // ------------------------------------------------------------------------ + + /** + * The event provider + */ + private final TmfDataProvider fProvider; + + /** + * The wrapped event request + */ + private final ITmfDataRequest fRequest; + + /** + * The request execution priority + */ + private final ExecutionType fExecType; + + /** + * The wrapped thread (if applicable) + */ + private final TmfEventThread fThread; + + /** + * The thread execution state + */ + private volatile boolean isPaused = false; + private volatile boolean isCompleted = false; + + /** + * The synchronization object + */ + private final Object object = new Object(); + + // ------------------------------------------------------------------------ + // Constructor + // ------------------------------------------------------------------------ + + /** + * Basic constructor + * + * @param provider the event provider + * @param request the request to process + */ + public TmfEventThread(TmfDataProvider provider, ITmfDataRequest request) { + assert provider != null; + assert request != null; + fProvider = provider; + fRequest = request; + fExecType = request.getExecType(); + fThread = null; + } + + /** + * Wrapper constructor + * + * @param thread the thread to wrap + */ + public TmfEventThread(TmfEventThread thread) { + fProvider = thread.fProvider; + fRequest = thread.fRequest; + fExecType = thread.fExecType; + fThread = thread; + } + + // ------------------------------------------------------------------------ + // Getters + // ------------------------------------------------------------------------ + + /** + * @return The wrapped thread + */ + public TmfEventThread getThread() { + return fThread; + } + + /** + * @return The event provider + */ + public ITmfDataProvider getProvider() { + return fProvider; + } + + /** + * @return The event request + */ + public ITmfDataRequest getRequest() { + return fRequest; + } + + /** + * @return The request execution priority + */ + public ExecutionType getExecType() { + return fExecType; + } + + /** + * @return The request execution state + */ + public boolean isRunning() { + return fRequest.isRunning() && !isPaused; + } + + /** + * @return The request execution state + */ + public boolean isCompleted() { + return isCompleted; + } + + // ------------------------------------------------------------------------ + // Runnable + // ------------------------------------------------------------------------ + + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + + TmfCoreTracer.traceRequest(fRequest, "is being serviced by " + fProvider.getName()); //$NON-NLS-1$ + + // Extract the generic information + fRequest.start(); + int nbRequested = fRequest.getNbRequested(); + int nbRead = 0; + isCompleted = false; + + // Initialize the execution + ITmfContext context = fProvider.armRequest(fRequest); + if (context == null) { + fRequest.cancel(); + return; + } + + try { + // Get the ordered events + ITmfEvent event = fProvider.getNext(context); + TmfCoreTracer.traceRequest(fRequest, "read first event"); //$NON-NLS-1$ + + while (event != null && !fProvider.isCompleted(fRequest, event, nbRead)) { + if (isPaused) { + try { + while (isPaused) { + synchronized (object) { + object.wait(); + } + } + } catch (InterruptedException e) { + } + } + + TmfCoreTracer.traceEvent(fProvider, fRequest, event); + if (fRequest.getDataType().isInstance(event)) { + fRequest.handleData(event); + } + + // To avoid an unnecessary read passed the last event requested + if (++nbRead < nbRequested) { + event = fProvider.getNext(context); + } + } + + isCompleted = true; + + if (fRequest.isCancelled()) { + fRequest.cancel(); + } else { + fRequest.done(); + } + + } catch (Exception e) { + fRequest.fail(); + } + + // Cleanup + context.dispose(); + } + + // ------------------------------------------------------------------------ + // Operations + // ------------------------------------------------------------------------ + + /** + * Suspend the thread + */ + public synchronized void suspend() { + isPaused = true; + TmfCoreTracer.traceRequest(fRequest, "SUSPENDED"); //$NON-NLS-1$ + } + + /** + * Resume the thread + */ + public synchronized void resume() { + isPaused = false; + synchronized (object) { + object.notifyAll(); + } + TmfCoreTracer.traceRequest(fRequest, "RESUMED"); //$NON-NLS-1$ + } + + /** + * Cancel the request + */ + public void cancel() { + if (!fRequest.isCompleted()) { + fRequest.cancel(); + } + } + +} diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java deleted file mode 100644 index 9453bf23ed..0000000000 --- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/component/TmfThread.java +++ /dev/null @@ -1,50 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2010 Ericsson - * - * All rights reserved. This program and the accompanying materials are - * made available under the terms of the Eclipse Public License v1.0 which - * accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Francois Chouinard - Initial API and implementation - *******************************************************************************/ - -package org.eclipse.linuxtools.internal.tmf.core.component; - -import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType; - -/** - * Utility class to add an execution class to a simple Java thread - * - * @version 1.0 - * @author Francois Chouinard - */ -public class TmfThread extends Thread { - - private final ExecutionType fExecType; - - /** - * Standard constructor - * - * @param execType - * The data request's ExecutionType - */ - public TmfThread(ExecutionType execType) { - fExecType = execType; - } - - /** - * @return The Execution type - */ - public ExecutionType getExecType() { - return fExecType; - } - - /** - * Cancel this request - */ - public void cancel() { - } - -} diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java index 09e8716822..2be6143934 100644 --- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java +++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/internal/tmf/core/request/TmfRequestExecutor.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2009, 2010 Ericsson + * Copyright (c) 2009, 2010, 2012 Ericsson * * All rights reserved. This program and the accompanying materials are * made available under the terms of the Eclipse Public License v1.0 which @@ -8,44 +8,44 @@ * * Contributors: * Francois Chouinard - Initial API and implementation + * Francois Chouinard - Added support for pre-emption *******************************************************************************/ package org.eclipse.linuxtools.internal.tmf.core.request; -import java.util.Comparator; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; -import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread; +import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType; /** * A simple, straightforward request executor. * - * @version 1.0 * @author Francois Chouinard + * @version 1.1 */ public class TmfRequestExecutor implements Executor { - private final ExecutorService fExecutor; + // ------------------------------------------------------------------------ + // Attributes + // ------------------------------------------------------------------------ + + // The request executor + private final ExecutorService fExecutor = Executors.newFixedThreadPool(2); private final String fExecutorName; - private final PriorityBlockingQueue<TmfThread> fRequestQueue = new PriorityBlockingQueue<TmfThread>( - 100, new Comparator<TmfThread>() { - @Override - public int compare(TmfThread o1, TmfThread o2) { - if (o1.getExecType() == o2.getExecType()) { - return 0; - } - if (o1.getExecType() == ExecutionType.BACKGROUND) { - return 1; - } - return -1; - } - }); - private TmfThread fCurrentRequest; + + // The request queues + private final Queue<TmfEventThread> fHighPriorityTasks = new ArrayBlockingQueue<TmfEventThread>(100); + private final Queue<TmfEventThread> fLowPriorityTasks = new ArrayBlockingQueue<TmfEventThread>(100); + + // The tasks + private TmfEventThread fActiveTask; + private TmfEventThread fSuspendedTask; // ------------------------------------------------------------------------ // Constructors @@ -55,7 +55,11 @@ public class TmfRequestExecutor implements Executor { * Default constructor */ public TmfRequestExecutor() { - this(Executors.newSingleThreadExecutor()); + String canonicalName = fExecutor.getClass().getCanonicalName(); + fExecutorName = canonicalName.substring(canonicalName.lastIndexOf('.') + 1); + if (TmfCoreTracer.isComponentTraced()) { + TmfCoreTracer.trace(fExecutor + " created"); //$NON-NLS-1$ + } } /** @@ -63,21 +67,21 @@ public class TmfRequestExecutor implements Executor { * * @param executor The executor service to use */ + @Deprecated public TmfRequestExecutor(ExecutorService executor) { - fExecutor = executor; - String canonicalName = fExecutor.getClass().getCanonicalName(); - fExecutorName = canonicalName.substring(canonicalName.lastIndexOf('.') + 1); - if (TmfCoreTracer.isComponentTraced()) - { - TmfCoreTracer.trace(fExecutor + " created"); //$NON-NLS-1$ - } + this(); } + // ------------------------------------------------------------------------ + // Getters + // ------------------------------------------------------------------------ + /** * @return the number of pending requests */ + @Deprecated public synchronized int getNbPendingRequests() { - return fRequestQueue.size(); + return fHighPriorityTasks.size() + fLowPriorityTasks.size(); } /** @@ -94,64 +98,84 @@ public class TmfRequestExecutor implements Executor { return fExecutor.isTerminated(); } - /** - * Stops the executor - */ - public synchronized void stop() { - if (fCurrentRequest != null) { - fCurrentRequest.cancel(); - } - - while ((fCurrentRequest = fRequestQueue.poll()) != null) { - fCurrentRequest.cancel(); - } - - fExecutor.shutdown(); - if (TmfCoreTracer.isComponentTraced()) - { - TmfCoreTracer.trace(fExecutor + " terminated"); //$NON-NLS-1$ - } - } - - // ------------------------------------------------------------------------ - // Operations - // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // Operations + // ------------------------------------------------------------------------ /* (non-Javadoc) * @see java.util.concurrent.Executor#execute(java.lang.Runnable) */ @Override - public synchronized void execute(final Runnable requestThread) { - fRequestQueue.offer(new TmfThread(((TmfThread) requestThread).getExecType()) { - @Override - public void run() { - try { - requestThread.run(); - } finally { - scheduleNext(); - } - } + public synchronized void execute(final Runnable command) { + + // We are expecting MyEventThread:s + if (!(command instanceof TmfEventThread)) { + // TODO: Log an error + return; + } + + // Wrap the thread in a MyThread + TmfEventThread thread = (TmfEventThread) command; + TmfEventThread wrapper = new TmfEventThread(thread) { @Override - public void cancel() { - ((TmfThread) requestThread).cancel(); + public void run() { + try { + command.run(); + } finally { + scheduleNext(); + } } - }); - if (fCurrentRequest == null) { - scheduleNext(); - } + }; + + // Add the thread to the appropriate queue + ExecutionType priority = thread.getExecType(); + (priority == ExecutionType.FOREGROUND ? fHighPriorityTasks : fLowPriorityTasks).offer(wrapper); + + // Schedule or preempt as appropriate + if (fActiveTask == null) { + scheduleNext(); + } else if (priority == ExecutionType.FOREGROUND && priority != fActiveTask.getExecType()) { + fActiveTask.getThread().suspend(); + fSuspendedTask = fActiveTask; + scheduleNext(); + } } /** * Executes the next pending request, if applicable. */ protected synchronized void scheduleNext() { - if ((fCurrentRequest = fRequestQueue.poll()) != null) { - if (!isShutdown()) { - fExecutor.execute(fCurrentRequest); - } - } + if (!isShutdown()) { + if ((fActiveTask = fHighPriorityTasks.poll()) != null) { + fExecutor.execute(fActiveTask); + } else if (fSuspendedTask != null) { + fActiveTask = fSuspendedTask; + fSuspendedTask = null; + fActiveTask.getThread().resume(); + } else if ((fActiveTask = fLowPriorityTasks.poll()) != null) { + fExecutor.execute(fActiveTask); + } + } } + /** + * Stops the executor + */ + public synchronized void stop() { + if (fActiveTask != null) { + fActiveTask.cancel(); + } + + while ((fActiveTask = fHighPriorityTasks.poll()) != null) { + fActiveTask.cancel(); + } + + fExecutor.shutdown(); + if (TmfCoreTracer.isComponentTraced()) { + TmfCoreTracer.trace(fExecutor + " terminated"); //$NON-NLS-1$ + } + } + // ------------------------------------------------------------------------ // Object // ------------------------------------------------------------------------ diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java index 0341ef56c5..b24624dfaf 100644 --- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java +++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfDataProvider.java @@ -8,6 +8,7 @@ * * Contributors: * Francois Chouinard - Initial API and implementation + * Francois Chouinard - Replace background requests by pre-emptable requests *******************************************************************************/ package org.eclipse.linuxtools.tmf.core.component; @@ -18,8 +19,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; +import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager; -import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread; import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest; import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; @@ -40,10 +41,9 @@ import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; * The concrete class can either re-implement processRequest() entirely or just * implement the hooks (initializeContext() and getNext()). * <p> - * TODO: Add support for providing multiple data types. * - * @version 1.0 * @author Francois Chouinard + * @version 1.1 */ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataProvider { @@ -321,78 +321,12 @@ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataPr return; } - final TmfDataProvider provider = this; - - // Process the request - TmfThread thread = new TmfThread(request.getExecType()) { - - @Override - public void run() { - - if (TmfCoreTracer.isRequestTraced()) { - TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$ - } - - // Extract the generic information - request.start(); - int nbRequested = request.getNbRequested(); - int nbRead = 0; - - // Initialize the execution - ITmfContext context = armRequest(request); - if (context == null) { - request.cancel(); - return; - } - - try { - // Get the ordered events - ITmfEvent data = getNext(context); - if (TmfCoreTracer.isRequestTraced()) { - TmfCoreTracer.traceRequest(request, "read first event"); //$NON-NLS-1$ - } - while (data != null && !isCompleted(request, data, nbRead)) { - if (fLogData) { - TmfCoreTracer.traceEvent(provider, request, data); - } - if (request.getDataType().isInstance(data)) { - request.handleData(data); - } - - // To avoid an unnecessary read passed the last data - // requested - if (++nbRead < nbRequested) { - data = getNext(context); - } - } - if (TmfCoreTracer.isRequestTraced()) { - TmfCoreTracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$ - } - - if (request.isCancelled()) { - request.cancel(); - } else { - request.done(); - } - } catch (Exception e) { - request.fail(); - } - - // Cleanup - context.dispose(); - } - - @Override - public void cancel() { - if (!request.isCompleted()) { - request.cancel(); - } - } - }; + TmfEventThread thread = new TmfEventThread(this, request); if (TmfCoreTracer.isRequestTraced()) { TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$ } + fExecutor.execute(thread); } @@ -406,87 +340,8 @@ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataPr * @param indexing * Should we index the chunks */ - protected void queueBackgroundRequest(final ITmfDataRequest request, - final int blockSize, final boolean indexing) { - - final TmfDataProvider provider = this; - - Thread thread = new Thread() { - @Override - public void run() { - - if (TmfCoreTracer.isRequestTraced()) { - TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$ - } - - request.start(); - - final Integer[] CHUNK_SIZE = new Integer[1]; - CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0)); - - final Integer[] nbRead = new Integer[1]; - nbRead[0] = 0; - - final Boolean[] isFinished = new Boolean[1]; - isFinished[0] = Boolean.FALSE; - - while (!isFinished[0]) { - - TmfDataRequest subRequest = new TmfDataRequest(request.getDataType(), request.getIndex() - + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) { - - @Override - public synchronized boolean isCompleted() { - return super.isCompleted() || request.isCompleted(); - } - - @Override - public void handleData(ITmfEvent data) { - super.handleData(data); - if (request.getDataType().isInstance(data)) { - request.handleData(data); - } - if (getNbRead() > CHUNK_SIZE[0]) { - System.out.println("ERROR - Read too many events"); //$NON-NLS-1$ - } - } - - @Override - public void handleCompleted() { - nbRead[0] += getNbRead(); - if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) { - if (this.isCancelled()) { - request.cancel(); - } else if (this.isFailed()) { - request.fail(); - } else { - request.done(); - } - isFinished[0] = Boolean.TRUE; - } - super.handleCompleted(); - } - }; - - if (!isFinished[0]) { - queueRequest(subRequest); - - try { - subRequest.waitForCompletion(); - if (request.isCompleted()) { - isFinished[0] = Boolean.TRUE; - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - - CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize); - } - } - } - }; - - thread.start(); + protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) { + queueRequest(request); } /** @@ -498,7 +353,7 @@ public abstract class TmfDataProvider extends TmfComponent implements ITmfDataPr * @return Sn application specific context; null if request can't be * serviced */ - protected abstract ITmfContext armRequest(ITmfDataRequest request); + public abstract ITmfContext armRequest(ITmfDataRequest request); // /** // * Return the next event based on the context supplied. The context diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java index 12ce5b59ec..cb54d2e2f4 100644 --- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java +++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/component/TmfEventProvider.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2009, 2010 Ericsson + * Copyright (c) 2009, 2010, 2012 Ericsson * * All rights reserved. This program and the accompanying materials are * made available under the terms of the Eclipse Public License v1.0 which @@ -8,6 +8,7 @@ * * Contributors: * Francois Chouinard - Initial API and implementation + * Francois Chouinard - Replace background requests by pre-emptable requests *******************************************************************************/ package org.eclipse.linuxtools.tmf.core.component; @@ -17,15 +18,13 @@ import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp; import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest; -import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType; import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest; -import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest; /** * An extension of TmfDataProvider timestamped events providers. * - * @version 1.0 * @author Francois Chouinard + * @version 1.1 */ public abstract class TmfEventProvider extends TmfDataProvider { @@ -112,95 +111,4 @@ public abstract class TmfEventProvider extends TmfDataProvider { } } - @Override - protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) { - - if (! (request instanceof ITmfEventRequest)) { - super.queueBackgroundRequest(request, blockSize, indexing); - return; - } - - final TmfDataProvider provider = this; - - Thread thread = new Thread() { - @Override - public void run() { - - if (TmfCoreTracer.isRequestTraced()) { - TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$ - } - - request.start(); - - final Integer[] CHUNK_SIZE = new Integer[1]; - CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0)); - - final Integer[] nbRead = new Integer[1]; - nbRead[0] = 0; - - final Boolean[] isFinished = new Boolean[1]; - isFinished[0] = Boolean.FALSE; - - long startIndex = request.getIndex(); - - while (!isFinished[0]) { - - TmfEventRequest subRequest= new TmfEventRequest(request.getDataType(), ((ITmfEventRequest) request).getRange(), startIndex + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) { - - @Override - public synchronized boolean isCompleted() { - return super.isCompleted() || request.isCompleted(); - } - - @Override - public void handleData(ITmfEvent data) { - super.handleData(data); - if (request.getDataType().isInstance(data)) { - request.handleData(data); - } - if (this.getNbRead() > CHUNK_SIZE[0]) { - System.out.println("ERROR - Read too many events"); //$NON-NLS-1$ - } - } - - @Override - public void handleCompleted() { - nbRead[0] += this.getNbRead(); - if (nbRead[0] >= request.getNbRequested() || (this.getNbRead() < CHUNK_SIZE[0])) { - if (this.isCancelled()) { - request.cancel(); - } else if (this.isFailed()) { - request.fail(); - } else { - request.done(); - } - isFinished[0] = Boolean.TRUE; - } - super.handleCompleted(); - } - }; - - if (!isFinished[0]) { - queueRequest(subRequest); - - try { - subRequest.waitForCompletion(); - if (request.isCompleted()) { - isFinished[0] = Boolean.TRUE; - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - - if (startIndex == 0 && nbRead[0].equals(CHUNK_SIZE[0])) { // do this only once if the event request index is unknown - startIndex = subRequest.getIndex(); // update the start index with the index of the first subrequest's - } // start time event which was set during the arm request - CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize); - } - } - } - }; - - thread.start(); - } } diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java index eda777efdd..fa9c948338 100644 --- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java +++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfExperiment.java @@ -202,7 +202,7 @@ public class TmfExperiment extends TmfTrace implements ITmfEventParser { * @see org.eclipse.linuxtools.tmf.core.trace.TmfTrace#armRequest(org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest) */ @Override - protected synchronized ITmfContext armRequest(final ITmfDataRequest request) { + public synchronized ITmfContext armRequest(final ITmfDataRequest request) { // Make sure we have something to read from if (fTraces == null) { diff --git a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java index 6f776496de..e59637fb1f 100644 --- a/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java +++ b/lttng/org.eclipse.linuxtools.tmf.core/src/org/eclipse/linuxtools/tmf/core/trace/TmfTrace.java @@ -674,7 +674,7 @@ public abstract class TmfTrace extends TmfEventProvider implements ITmfTrace { * @see org.eclipse.linuxtools.tmf.core.component.TmfDataProvider#armRequest(org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest) */ @Override - protected synchronized ITmfContext armRequest(final ITmfDataRequest request) { + public synchronized ITmfContext armRequest(final ITmfDataRequest request) { if (executorIsShutdown()) { return null; } |