diff options
author | Pawel Piech | 2010-04-23 20:21:03 +0000 |
---|---|---|
committer | Pawel Piech | 2010-04-23 20:21:03 +0000 |
commit | 8a353a17587ad32197ebb8a094e880ef66d8e8e9 (patch) | |
tree | 58a72149c32d49995e485a42c155ebf3891f0998 | |
parent | ded34fbdbb1196552dc997a6a0b5ac7c0a4398fc (diff) | |
download | org.eclipse.cdt-8a353a17587ad32197ebb8a094e880ef66d8e8e9.tar.gz org.eclipse.cdt-8a353a17587ad32197ebb8a094e880ef66d8e8e9.tar.xz org.eclipse.cdt-8a353a17587ad32197ebb8a094e880ef66d8e8e9.zip |
Bug 310335 - [concurrent] Query utility does not propagate a cancel requests
-rw-r--r-- | dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java | 235 | ||||
-rw-r--r-- | dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java | 144 |
2 files changed, 219 insertions, 160 deletions
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java index 99eb178d709..84165307078 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java @@ -15,9 +15,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import org.eclipse.cdt.dsf.internal.DsfPlugin; import org.eclipse.core.runtime.CoreException; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; /** @@ -54,18 +56,90 @@ import org.eclipse.core.runtime.CoreException; abstract public class Query<V> extends DsfRunnable implements Future<V> { - /** The synchronization object for this query */ - private final Sync fSync = new Sync(); + private class QueryRm extends DataRequestMonitor<V> { + boolean fExecuted = false; + + boolean fCompleted = false; + + private QueryRm() { + super(ImmediateExecutor.getInstance(), null); + } + + @Override + public synchronized void handleCompleted() { + fCompleted = true; + notifyAll(); + } + + public synchronized boolean isCompleted() { + return fCompleted; + } + + public synchronized boolean setExecuted() { + if (fExecuted || isCanceled()) { + // already executed or canceled + return false; + } + fExecuted = true; + return true; + } + + public synchronized boolean isExecuted() { + return fExecuted; + } + }; + + private final QueryRm fRm = new QueryRm(); + /** * The no-argument constructor */ public Query() {} - - public V get() throws InterruptedException, ExecutionException { return fSync.doGet(); } + + public V get() throws InterruptedException, ExecutionException { + IStatus status; + V data; + synchronized (fRm) { + while (!isDone()) { + fRm.wait(); + } + status = fRm.getStatus(); + data = fRm.getData(); + } + + if (status.getSeverity() == IStatus.CANCEL) { + throw new CancellationException(); + } else if (status.getSeverity() != IStatus.OK) { + throw new ExecutionException(new CoreException(status)); + } + return data; + } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return fSync.doGet(unit.toNanos(timeout)); + long timeLeft = unit.toMillis(timeout); + long timeoutTime = System.currentTimeMillis() + unit.toMillis(timeout); + + IStatus status; + V data; + synchronized (fRm) { + while (!isDone()) { + if (timeLeft <= 0) { + throw new TimeoutException(); + } + fRm.wait(timeLeft); + timeLeft = timeoutTime - System.currentTimeMillis(); + } + status = fRm.getStatus(); + data = fRm.getData(); + } + + if (status.getSeverity() == IStatus.CANCEL) { + throw new CancellationException(); + } else if (status.getSeverity() != IStatus.OK) { + throw new ExecutionException(new CoreException(status)); + } + return data; } /** @@ -73,139 +147,44 @@ abstract public class Query<V> extends DsfRunnable * if set. */ public boolean cancel(boolean mayInterruptIfRunning) { - return fSync.doCancel(); + boolean completed = false; + synchronized (fRm) { + completed = fRm.isCompleted(); + if (!completed) { + fRm.cancel(); + } + } + return !completed; } - public boolean isCancelled() { return fSync.doIsCancelled(); } + public boolean isCancelled() { return fRm.isCanceled(); } - public boolean isDone() { return fSync.doIsDone(); } - + public boolean isDone() { + synchronized (fRm) { + return fRm.isCompleted() || (fRm.isCanceled() && !fRm.isExecuted()); + } + } - protected void doneException(Throwable t) { - fSync.doSetException(t); - } abstract protected void execute(DataRequestMonitor<V> rm); public void run() { - if (fSync.doRun()) { - try { - /* - * Create the executor which is going to handle the completion of the - * request monitor. Normally a DSF executor is supplied here which - * causes the request monitor to be invoked in a new dispatch loop. - * But since the query is a synchronization object, it can handle - * the completion of the request in any thread. - * Avoiding the use of a DSF executor is very useful because queries are - * meant to be used by clients calling from non-dispatch thread, and there - * is a chance that a client may execute a query just as a session is being - * shut down. In that case, the DSF executor may throw a - * RejectedExecutionException which would have to be handled by the query. - */ - execute(new DataRequestMonitor<V>(ImmediateExecutor.getInstance(), null) { - @Override - public void handleCompleted() { - if (isSuccess()) fSync.doSet(getData()); - else fSync.doSetException(new CoreException(getStatus())); - } - }); - } catch(Throwable t) { - /* - * Catching the exception here will only work if the exception - * happens within the execute. It will not work in cases when - * the execute submits other runnables, and the other runnables - * encounter the exception. - */ - fSync.doSetException(t); - - /* - * Since we caught the exception, it will not be logged by - * DefaultDsfExecutable.afterExecution(). So log it here. - */ - DefaultDsfExecutor.logException(t); - } + if (fRm.setExecuted()) { + execute(fRm); } } - - @SuppressWarnings("serial") - final class Sync extends AbstractQueuedSynchronizer { - private static final int STATE_RUNNING = 1; - private static final int STATE_DONE = 2; - private static final int STATE_CANCELLED = 4; - - private V fResult; - private Throwable fException; - - private boolean ranOrCancelled(int state) { - return (state & (STATE_DONE | STATE_CANCELLED)) != 0; - } - - @Override - protected int tryAcquireShared(int ignore) { - return doIsDone()? 1 : -1; - } - - @Override - protected boolean tryReleaseShared(int ignore) { - return true; - } - - boolean doIsCancelled() { - return getState() == STATE_CANCELLED; - } - - boolean doIsDone() { - return ranOrCancelled(getState()); - } - - V doGet() throws InterruptedException, ExecutionException { - acquireSharedInterruptibly(0); - if (getState() == STATE_CANCELLED) throw new CancellationException(); - if (fException != null) throw new ExecutionException(fException); - return fResult; - } - - V doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { - if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); - if (getState() == STATE_CANCELLED) throw new CancellationException(); - if (fException != null) throw new ExecutionException(fException); - return fResult; - } - void doSet(V v) { - while(true) { - int s = getState(); - if (ranOrCancelled(s)) return; - if (compareAndSetState(s, STATE_DONE)) break; - } - fResult = v; - releaseShared(0); - } - - void doSetException(Throwable t) { - while(true) { - int s = getState(); - if (ranOrCancelled(s)) return; - if (compareAndSetState(s, STATE_DONE)) break; - } - fException = t; - fResult = null; - releaseShared(0); - } - - boolean doCancel() { - while(true) { - int s = getState(); - if (ranOrCancelled(s)) return false; - if (compareAndSetState(s, STATE_CANCELLED)) break; - } - releaseShared(0); - return true; - } + /** + * Completes the query with the given exception. + * + * @deprecated Query implementations should call the request monitor to + * set the exception status directly. + */ + protected void doneException(Throwable t) { + fRm.setStatus(new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INTERNAL_ERROR, "Exception", t)); //$NON-NLS-1$ + fRm.done(); + } - boolean doRun() { - return compareAndSetState(0, STATE_RUNNING); - } - } + } diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java index 6c78e535019..7da7f99aea4 100644 --- a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java +++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java @@ -19,13 +19,14 @@ import junit.framework.Assert; import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; import org.eclipse.cdt.dsf.concurrent.DsfRunnable; +import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants; import org.eclipse.cdt.dsf.concurrent.Query; +import org.eclipse.cdt.dsf.concurrent.RequestMonitor; +import org.eclipse.cdt.dsf.concurrent.RequestMonitor.ICanceledListener; import org.eclipse.cdt.tests.dsf.DsfTestPlugin; import org.eclipse.cdt.tests.dsf.TestDsfExecutor; -import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; -import org.eclipse.core.runtime.jobs.Job; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -76,6 +77,65 @@ public class DsfQueryTests { } @Test + public void getErrorTest() throws InterruptedException, ExecutionException { + final String error_message = "Test Error"; + + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + rm.setStatus(new Status(IStatus.ERROR, DsfTestPlugin.PLUGIN_ID, IDsfStatusConstants.INTERNAL_ERROR, error_message, null)); //$NON-NLS-1$ + rm.done(); + } + }; + + // Check initial state + Assert.assertTrue(!q.isDone()); + Assert.assertTrue(!q.isCancelled()); + + fExecutor.execute(q); + + try { + q.get(); + Assert.fail("Expected exception"); + } catch (ExecutionException e) { + Assert.assertEquals(e.getCause().getMessage(), error_message); + } + + // Check final state + Assert.assertTrue(q.isDone()); + Assert.assertTrue(!q.isCancelled()); + + } + + @Test + public void doneExceptionTest() throws InterruptedException, ExecutionException { + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + doneException(new Throwable()); + } + }; + + // Check initial state + Assert.assertTrue(!q.isDone()); + Assert.assertTrue(!q.isCancelled()); + + fExecutor.execute(q); + + try { + q.get(); + Assert.fail("Expected exception"); + } catch (ExecutionException e) { + } + + // Check final state + Assert.assertTrue(q.isDone()); + Assert.assertTrue(!q.isCancelled()); + + } + + + @Test public void getWithMultipleDispatchesTest() throws InterruptedException, ExecutionException { Query<Integer> q = new Query<Integer>() { @Override @@ -117,32 +177,26 @@ public class DsfQueryTests { } @Test - public void cancelWhileWaitingTest() throws InterruptedException, ExecutionException { + public void cancelBeforeWaitingTest() throws InterruptedException, ExecutionException { final Query<Integer> q = new Query<Integer>() { - @Override - protected void execute(final DataRequestMonitor<Integer> rm) { - // Call done with a delay of 1 second, to avoid stalling the tests. - fExecutor.schedule( - new DsfRunnable() { - public void run() { rm.done(); } - }, - 1, TimeUnit.SECONDS); + @Override protected void execute(final DataRequestMonitor<Integer> rm) { + Assert.fail("Query was cancelled, it should not be called."); //$NON-NLS-1$ + rm.done(); } }; + + // Cancel before invoking the query. + q.cancel(false); - fExecutor.execute(q); + Assert.assertTrue(q.isDone()); + Assert.assertTrue(q.isCancelled()); - // Note: no point in checking isDone() and isCancelled() here, because - // the value could change on timing. + // Start the query. + fExecutor.execute(q); - // This does not really guarantee that the cancel will be called after - // the call to Fugure.get(), but the 1ms delay in call to schedule should - // help. - new Job("DsfQueryTests cancel job") { @Override public IStatus run(IProgressMonitor monitor) { //$NON-NLS-1$ - q.cancel(false); - return Status.OK_STATUS; - }}.schedule(1); + + // Block to retrieve data try { q.get(); } catch (CancellationException e) { @@ -155,24 +209,49 @@ public class DsfQueryTests { } @Test - public void cancelBeforeWaitingTest() throws InterruptedException, ExecutionException { + public void cancelWhileWaitingTest() throws InterruptedException, ExecutionException { + final DataRequestMonitor<?>[] rmHolder = new DataRequestMonitor<?>[1]; + final Boolean[] cancelCalled = new Boolean[] { Boolean.FALSE }; + final Query<Integer> q = new Query<Integer>() { @Override protected void execute(final DataRequestMonitor<Integer> rm) { - Assert.fail("Query was cancelled, it should not be called."); //$NON-NLS-1$ - rm.done(); + synchronized (rmHolder) { + rmHolder[0] = rm; + rmHolder.notifyAll(); + } } }; - // Cancel before invoking the query. - q.cancel(false); - - Assert.assertTrue(q.isDone()); - Assert.assertTrue(q.isCancelled()); - // Start the query. fExecutor.execute(q); + + // Wait until the query is started + synchronized (rmHolder) { + while(rmHolder[0] == null) { + rmHolder.wait(); + } + } - // Block to retrieve data + // Add a cancel listener to the query RM + rmHolder[0].addCancelListener(new ICanceledListener() { + + public void requestCanceled(RequestMonitor rm) { + cancelCalled[0] = Boolean.TRUE; + } + }); + + // Cancel running request. + q.cancel(false); + + Assert.assertTrue(cancelCalled[0]); + Assert.assertTrue(rmHolder[0].isCanceled()); + Assert.assertTrue(q.isCancelled()); + Assert.assertFalse(q.isDone()); + + // Complete rm and query. + rmHolder[0].done(); + + // Retrieve data try { q.get(); } catch (CancellationException e) { @@ -183,6 +262,7 @@ public class DsfQueryTests { } Assert.assertTrue("CancellationException should have been thrown", false); //$NON-NLS-1$ } + @Test public void getTimeoutTest() throws InterruptedException, ExecutionException { @@ -194,7 +274,7 @@ public class DsfQueryTests { new DsfRunnable() { public void run() { rm.done(); } }, - 1, TimeUnit.SECONDS); + 60, TimeUnit.SECONDS); } }; |