Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPawel Piech2010-04-23 20:21:03 +0000
committerPawel Piech2010-04-23 20:21:03 +0000
commit8a353a17587ad32197ebb8a094e880ef66d8e8e9 (patch)
tree58a72149c32d49995e485a42c155ebf3891f0998
parentded34fbdbb1196552dc997a6a0b5ac7c0a4398fc (diff)
downloadorg.eclipse.cdt-8a353a17587ad32197ebb8a094e880ef66d8e8e9.tar.gz
org.eclipse.cdt-8a353a17587ad32197ebb8a094e880ef66d8e8e9.tar.xz
org.eclipse.cdt-8a353a17587ad32197ebb8a094e880ef66d8e8e9.zip
Bug 310335 - [concurrent] Query utility does not propagate a cancel requests
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java235
-rw-r--r--dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java144
2 files changed, 219 insertions, 160 deletions
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java
index 99eb178d709..84165307078 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Query.java
@@ -15,9 +15,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.eclipse.cdt.dsf.internal.DsfPlugin;
import org.eclipse.core.runtime.CoreException;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
/**
@@ -54,18 +56,90 @@ import org.eclipse.core.runtime.CoreException;
abstract public class Query<V> extends DsfRunnable
implements Future<V>
{
- /** The synchronization object for this query */
- private final Sync fSync = new Sync();
+ private class QueryRm extends DataRequestMonitor<V> {
+ boolean fExecuted = false;
+
+ boolean fCompleted = false;
+
+ private QueryRm() {
+ super(ImmediateExecutor.getInstance(), null);
+ }
+
+ @Override
+ public synchronized void handleCompleted() {
+ fCompleted = true;
+ notifyAll();
+ }
+
+ public synchronized boolean isCompleted() {
+ return fCompleted;
+ }
+
+ public synchronized boolean setExecuted() {
+ if (fExecuted || isCanceled()) {
+ // already executed or canceled
+ return false;
+ }
+ fExecuted = true;
+ return true;
+ }
+
+ public synchronized boolean isExecuted() {
+ return fExecuted;
+ }
+ };
+
+ private final QueryRm fRm = new QueryRm();
+
/**
* The no-argument constructor
*/
public Query() {}
-
- public V get() throws InterruptedException, ExecutionException { return fSync.doGet(); }
+
+ public V get() throws InterruptedException, ExecutionException {
+ IStatus status;
+ V data;
+ synchronized (fRm) {
+ while (!isDone()) {
+ fRm.wait();
+ }
+ status = fRm.getStatus();
+ data = fRm.getData();
+ }
+
+ if (status.getSeverity() == IStatus.CANCEL) {
+ throw new CancellationException();
+ } else if (status.getSeverity() != IStatus.OK) {
+ throw new ExecutionException(new CoreException(status));
+ }
+ return data;
+ }
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return fSync.doGet(unit.toNanos(timeout));
+ long timeLeft = unit.toMillis(timeout);
+ long timeoutTime = System.currentTimeMillis() + unit.toMillis(timeout);
+
+ IStatus status;
+ V data;
+ synchronized (fRm) {
+ while (!isDone()) {
+ if (timeLeft <= 0) {
+ throw new TimeoutException();
+ }
+ fRm.wait(timeLeft);
+ timeLeft = timeoutTime - System.currentTimeMillis();
+ }
+ status = fRm.getStatus();
+ data = fRm.getData();
+ }
+
+ if (status.getSeverity() == IStatus.CANCEL) {
+ throw new CancellationException();
+ } else if (status.getSeverity() != IStatus.OK) {
+ throw new ExecutionException(new CoreException(status));
+ }
+ return data;
}
/**
@@ -73,139 +147,44 @@ abstract public class Query<V> extends DsfRunnable
* if set.
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- return fSync.doCancel();
+ boolean completed = false;
+ synchronized (fRm) {
+ completed = fRm.isCompleted();
+ if (!completed) {
+ fRm.cancel();
+ }
+ }
+ return !completed;
}
- public boolean isCancelled() { return fSync.doIsCancelled(); }
+ public boolean isCancelled() { return fRm.isCanceled(); }
- public boolean isDone() { return fSync.doIsDone(); }
-
+ public boolean isDone() {
+ synchronized (fRm) {
+ return fRm.isCompleted() || (fRm.isCanceled() && !fRm.isExecuted());
+ }
+ }
- protected void doneException(Throwable t) {
- fSync.doSetException(t);
- }
abstract protected void execute(DataRequestMonitor<V> rm);
public void run() {
- if (fSync.doRun()) {
- try {
- /*
- * Create the executor which is going to handle the completion of the
- * request monitor. Normally a DSF executor is supplied here which
- * causes the request monitor to be invoked in a new dispatch loop.
- * But since the query is a synchronization object, it can handle
- * the completion of the request in any thread.
- * Avoiding the use of a DSF executor is very useful because queries are
- * meant to be used by clients calling from non-dispatch thread, and there
- * is a chance that a client may execute a query just as a session is being
- * shut down. In that case, the DSF executor may throw a
- * RejectedExecutionException which would have to be handled by the query.
- */
- execute(new DataRequestMonitor<V>(ImmediateExecutor.getInstance(), null) {
- @Override
- public void handleCompleted() {
- if (isSuccess()) fSync.doSet(getData());
- else fSync.doSetException(new CoreException(getStatus()));
- }
- });
- } catch(Throwable t) {
- /*
- * Catching the exception here will only work if the exception
- * happens within the execute. It will not work in cases when
- * the execute submits other runnables, and the other runnables
- * encounter the exception.
- */
- fSync.doSetException(t);
-
- /*
- * Since we caught the exception, it will not be logged by
- * DefaultDsfExecutable.afterExecution(). So log it here.
- */
- DefaultDsfExecutor.logException(t);
- }
+ if (fRm.setExecuted()) {
+ execute(fRm);
}
}
-
- @SuppressWarnings("serial")
- final class Sync extends AbstractQueuedSynchronizer {
- private static final int STATE_RUNNING = 1;
- private static final int STATE_DONE = 2;
- private static final int STATE_CANCELLED = 4;
-
- private V fResult;
- private Throwable fException;
-
- private boolean ranOrCancelled(int state) {
- return (state & (STATE_DONE | STATE_CANCELLED)) != 0;
- }
-
- @Override
- protected int tryAcquireShared(int ignore) {
- return doIsDone()? 1 : -1;
- }
-
- @Override
- protected boolean tryReleaseShared(int ignore) {
- return true;
- }
-
- boolean doIsCancelled() {
- return getState() == STATE_CANCELLED;
- }
-
- boolean doIsDone() {
- return ranOrCancelled(getState());
- }
-
- V doGet() throws InterruptedException, ExecutionException {
- acquireSharedInterruptibly(0);
- if (getState() == STATE_CANCELLED) throw new CancellationException();
- if (fException != null) throw new ExecutionException(fException);
- return fResult;
- }
-
- V doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
- if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
- if (getState() == STATE_CANCELLED) throw new CancellationException();
- if (fException != null) throw new ExecutionException(fException);
- return fResult;
- }
- void doSet(V v) {
- while(true) {
- int s = getState();
- if (ranOrCancelled(s)) return;
- if (compareAndSetState(s, STATE_DONE)) break;
- }
- fResult = v;
- releaseShared(0);
- }
-
- void doSetException(Throwable t) {
- while(true) {
- int s = getState();
- if (ranOrCancelled(s)) return;
- if (compareAndSetState(s, STATE_DONE)) break;
- }
- fException = t;
- fResult = null;
- releaseShared(0);
- }
-
- boolean doCancel() {
- while(true) {
- int s = getState();
- if (ranOrCancelled(s)) return false;
- if (compareAndSetState(s, STATE_CANCELLED)) break;
- }
- releaseShared(0);
- return true;
- }
+ /**
+ * Completes the query with the given exception.
+ *
+ * @deprecated Query implementations should call the request monitor to
+ * set the exception status directly.
+ */
+ protected void doneException(Throwable t) {
+ fRm.setStatus(new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INTERNAL_ERROR, "Exception", t)); //$NON-NLS-1$
+ fRm.done();
+ }
- boolean doRun() {
- return compareAndSetState(0, STATE_RUNNING);
- }
- }
+
}
diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java
index 6c78e535019..7da7f99aea4 100644
--- a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java
+++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/DsfQueryTests.java
@@ -19,13 +19,14 @@ import junit.framework.Assert;
import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.DsfRunnable;
+import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants;
import org.eclipse.cdt.dsf.concurrent.Query;
+import org.eclipse.cdt.dsf.concurrent.RequestMonitor;
+import org.eclipse.cdt.dsf.concurrent.RequestMonitor.ICanceledListener;
import org.eclipse.cdt.tests.dsf.DsfTestPlugin;
import org.eclipse.cdt.tests.dsf.TestDsfExecutor;
-import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
-import org.eclipse.core.runtime.jobs.Job;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -76,6 +77,65 @@ public class DsfQueryTests {
}
@Test
+ public void getErrorTest() throws InterruptedException, ExecutionException {
+ final String error_message = "Test Error";
+
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ rm.setStatus(new Status(IStatus.ERROR, DsfTestPlugin.PLUGIN_ID, IDsfStatusConstants.INTERNAL_ERROR, error_message, null)); //$NON-NLS-1$
+ rm.done();
+ }
+ };
+
+ // Check initial state
+ Assert.assertTrue(!q.isDone());
+ Assert.assertTrue(!q.isCancelled());
+
+ fExecutor.execute(q);
+
+ try {
+ q.get();
+ Assert.fail("Expected exception");
+ } catch (ExecutionException e) {
+ Assert.assertEquals(e.getCause().getMessage(), error_message);
+ }
+
+ // Check final state
+ Assert.assertTrue(q.isDone());
+ Assert.assertTrue(!q.isCancelled());
+
+ }
+
+ @Test
+ public void doneExceptionTest() throws InterruptedException, ExecutionException {
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ doneException(new Throwable());
+ }
+ };
+
+ // Check initial state
+ Assert.assertTrue(!q.isDone());
+ Assert.assertTrue(!q.isCancelled());
+
+ fExecutor.execute(q);
+
+ try {
+ q.get();
+ Assert.fail("Expected exception");
+ } catch (ExecutionException e) {
+ }
+
+ // Check final state
+ Assert.assertTrue(q.isDone());
+ Assert.assertTrue(!q.isCancelled());
+
+ }
+
+
+ @Test
public void getWithMultipleDispatchesTest() throws InterruptedException, ExecutionException {
Query<Integer> q = new Query<Integer>() {
@Override
@@ -117,32 +177,26 @@ public class DsfQueryTests {
}
@Test
- public void cancelWhileWaitingTest() throws InterruptedException, ExecutionException {
+ public void cancelBeforeWaitingTest() throws InterruptedException, ExecutionException {
final Query<Integer> q = new Query<Integer>() {
- @Override
- protected void execute(final DataRequestMonitor<Integer> rm) {
- // Call done with a delay of 1 second, to avoid stalling the tests.
- fExecutor.schedule(
- new DsfRunnable() {
- public void run() { rm.done(); }
- },
- 1, TimeUnit.SECONDS);
+ @Override protected void execute(final DataRequestMonitor<Integer> rm) {
+ Assert.fail("Query was cancelled, it should not be called."); //$NON-NLS-1$
+ rm.done();
}
};
+
+ // Cancel before invoking the query.
+ q.cancel(false);
- fExecutor.execute(q);
+ Assert.assertTrue(q.isDone());
+ Assert.assertTrue(q.isCancelled());
- // Note: no point in checking isDone() and isCancelled() here, because
- // the value could change on timing.
+ // Start the query.
+ fExecutor.execute(q);
- // This does not really guarantee that the cancel will be called after
- // the call to Fugure.get(), but the 1ms delay in call to schedule should
- // help.
- new Job("DsfQueryTests cancel job") { @Override public IStatus run(IProgressMonitor monitor) { //$NON-NLS-1$
- q.cancel(false);
- return Status.OK_STATUS;
- }}.schedule(1);
+
+ // Block to retrieve data
try {
q.get();
} catch (CancellationException e) {
@@ -155,24 +209,49 @@ public class DsfQueryTests {
}
@Test
- public void cancelBeforeWaitingTest() throws InterruptedException, ExecutionException {
+ public void cancelWhileWaitingTest() throws InterruptedException, ExecutionException {
+ final DataRequestMonitor<?>[] rmHolder = new DataRequestMonitor<?>[1];
+ final Boolean[] cancelCalled = new Boolean[] { Boolean.FALSE };
+
final Query<Integer> q = new Query<Integer>() {
@Override protected void execute(final DataRequestMonitor<Integer> rm) {
- Assert.fail("Query was cancelled, it should not be called."); //$NON-NLS-1$
- rm.done();
+ synchronized (rmHolder) {
+ rmHolder[0] = rm;
+ rmHolder.notifyAll();
+ }
}
};
- // Cancel before invoking the query.
- q.cancel(false);
-
- Assert.assertTrue(q.isDone());
- Assert.assertTrue(q.isCancelled());
-
// Start the query.
fExecutor.execute(q);
+
+ // Wait until the query is started
+ synchronized (rmHolder) {
+ while(rmHolder[0] == null) {
+ rmHolder.wait();
+ }
+ }
- // Block to retrieve data
+ // Add a cancel listener to the query RM
+ rmHolder[0].addCancelListener(new ICanceledListener() {
+
+ public void requestCanceled(RequestMonitor rm) {
+ cancelCalled[0] = Boolean.TRUE;
+ }
+ });
+
+ // Cancel running request.
+ q.cancel(false);
+
+ Assert.assertTrue(cancelCalled[0]);
+ Assert.assertTrue(rmHolder[0].isCanceled());
+ Assert.assertTrue(q.isCancelled());
+ Assert.assertFalse(q.isDone());
+
+ // Complete rm and query.
+ rmHolder[0].done();
+
+ // Retrieve data
try {
q.get();
} catch (CancellationException e) {
@@ -183,6 +262,7 @@ public class DsfQueryTests {
}
Assert.assertTrue("CancellationException should have been thrown", false); //$NON-NLS-1$
}
+
@Test
public void getTimeoutTest() throws InterruptedException, ExecutionException {
@@ -194,7 +274,7 @@ public class DsfQueryTests {
new DsfRunnable() {
public void run() { rm.done(); }
},
- 1, TimeUnit.SECONDS);
+ 60, TimeUnit.SECONDS);
}
};

Back to the top