diff options
author | Roberto E. Escobar | 2011-10-28 06:21:25 +0000 |
---|---|---|
committer | Roberto E. Escobar | 2011-10-28 06:21:25 +0000 |
commit | fe8d78ae9ccae47388e5e8709ed11f8e1e53d9cf (patch) | |
tree | e60bd4bc088a759c8375850a32d794edc67ed5f9 | |
parent | 5058815083f24a16e3aeac999b0f9a0f8d9bb730 (diff) | |
download | org.eclipse.osee-fe8d78ae9ccae47388e5e8709ed11f8e1e53d9cf.tar.gz org.eclipse.osee-fe8d78ae9ccae47388e5e8709ed11f8e1e53d9cf.tar.xz org.eclipse.osee-fe8d78ae9ccae47388e5e8709ed11f8e1e53d9cf.zip |
feature[ats_18K4T]: Add cancel to searches
18 files changed, 670 insertions, 111 deletions
diff --git a/plugins/org.eclipse.osee.executor.admin.test/META-INF/MANIFEST.MF b/plugins/org.eclipse.osee.executor.admin.test/META-INF/MANIFEST.MF index 7abc66e61b0..2ae48a2f776 100644 --- a/plugins/org.eclipse.osee.executor.admin.test/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.osee.executor.admin.test/META-INF/MANIFEST.MF @@ -6,6 +6,7 @@ Bundle-Version: 0.9.9.qualifier Bundle-Vendor: Eclipse Open System Engineering Environment Fragment-Host: org.eclipse.osee.executor.admin Bundle-RequiredExecutionEnvironment: JavaSE-1.6 -Import-Package: org.junit;version="4.8.2", +Import-Package: org.eclipse.osee.framework.jdk.core.type, + org.junit;version="4.8.2", org.junit.runner;version="4.8.2", org.junit.runners;version="4.8.2" diff --git a/plugins/org.eclipse.osee.executor.admin.test/build.properties b/plugins/org.eclipse.osee.executor.admin.test/build.properties index 5b9d2918ffc..edf08e91d3b 100644 --- a/plugins/org.eclipse.osee.executor.admin.test/build.properties +++ b/plugins/org.eclipse.osee.executor.admin.test/build.properties @@ -2,4 +2,5 @@ source.. = src/ output.. = bin/ bin.includes = META-INF/,\ . -additional.bundles = org.junit +additional.bundles = org.junit,\ + org.eclipse.osee.framework.jdk.core diff --git a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java new file mode 100644 index 00000000000..9299923f0c5 --- /dev/null +++ b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java @@ -0,0 +1,146 @@ +/******************************************************************************* + * Copyright (c) 2010 Boeing. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Boeing - initial API and implementation + *******************************************************************************/ +package org.eclipse.osee.executor.admin.internal; + +import java.util.HashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.eclipse.osee.executor.admin.CancellableCallable; +import org.eclipse.osee.executor.admin.mock.MockEventService; +import org.eclipse.osee.executor.admin.mock.MockExecutionCallback; +import org.eclipse.osee.executor.admin.mock.MockLog; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author Roberto E. Escobar + */ +public class ExecutionCallbackTest { + + @Test + public void testCallbackOnSuccess() throws Exception { + ExecutorAdminImpl admin = new ExecutorAdminImpl(); + admin.setLogger(new MockLog()); + admin.setEventService(new MockEventService()); + admin.start(new HashMap<String, Object>()); + + final String expected = "Was Called"; + + MockExecutionCallback<String> callback = new MockExecutionCallback<String>(); + Callable<String> callable = new Callable<String>() { + + @Override + public String call() throws Exception { + return expected; + } + + }; + Future<String> future = admin.schedule(callable, callback); + String actual = future.get(); + + Assert.assertEquals(expected, actual); + + Assert.assertTrue(callback.wasOnSuccess()); + Assert.assertFalse(callback.wasOnCancelled()); + Assert.assertFalse(callback.wasOnFailure()); + + Assert.assertEquals(expected, callback.getResult()); + Assert.assertNull(callback.getThrowable()); + } + + @Test + public void testCallbackOnFailure() throws Exception { + ExecutorAdminImpl admin = new ExecutorAdminImpl(); + admin.setLogger(new MockLog()); + admin.setEventService(new MockEventService()); + admin.start(new HashMap<String, Object>()); + + final Exception expectedException = new IllegalStateException(); + MockExecutionCallback<String> callback = new MockExecutionCallback<String>(); + Callable<String> callable = new Callable<String>() { + + @Override + public String call() throws Exception { + throw expectedException; + } + + }; + Future<String> future = admin.schedule(callable, callback); + + try { + future.get(); + Assert.assertTrue("An exception should have been thrown", false); + } catch (Exception ex) { + Assert.assertEquals(ExecutionException.class, ex.getClass()); + Assert.assertEquals(expectedException, ex.getCause()); + } + + Assert.assertFalse(callback.wasOnSuccess()); + Assert.assertFalse(callback.wasOnCancelled()); + Assert.assertTrue(callback.wasOnFailure()); + + Assert.assertNull(callback.getResult()); + Assert.assertEquals(IllegalStateException.class, callback.getThrowable().getClass()); + } + + @Test + public void testCallbackOnCancel() throws Exception { + ExecutorAdminImpl admin = new ExecutorAdminImpl(); + admin.setLogger(new MockLog()); + admin.setEventService(new MockEventService()); + admin.start(new HashMap<String, Object>()); + + final String results = "results"; + + MockExecutionCallback<String> callback = new MockExecutionCallback<String>(); + + TestCancellableCallable callable = new TestCancellableCallable(results); + Future<String> future = admin.schedule(callable, callback); + future.cancel(true); + + Assert.assertFalse(callback.wasOnSuccess()); + Assert.assertTrue(callback.wasOnCancelled()); + Assert.assertFalse(callback.wasOnFailure()); + + Assert.assertNull(callback.getResult()); + Assert.assertNull(callback.getThrowable()); + + Assert.assertEquals(true, callable.isCancelled()); + Assert.assertEquals(true, future.isCancelled()); + + try { + future.get(); + Assert.assertTrue("An exception should have been thrown", false); + } catch (Exception ex) { + Assert.assertEquals(CancellationException.class, ex.getClass()); + } + } + + private class TestCancellableCallable extends CancellableCallable<String> { + + private final String results; + + public TestCancellableCallable(String results) { + this.results = results; + } + + @Override + public String call() throws Exception { + while (!isCancelled()) { + checkForCancelled(); + // System.out.println("working..."); + } + return results; + } + } +} diff --git a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminIternalTestSuite.java b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminIternalTestSuite.java index 5569d96927b..08f68af3046 100644 --- a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminIternalTestSuite.java +++ b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminIternalTestSuite.java @@ -14,7 +14,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Suite; @RunWith(Suite.class) -@Suite.SuiteClasses({ExecutorAdminTest.class}) +@Suite.SuiteClasses({ExecutorAdminTest.class, ExecutionCallbackTest.class}) public class ExecutorAdminIternalTestSuite { // } diff --git a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/mock/MockExecutionCallback.java b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/mock/MockExecutionCallback.java new file mode 100644 index 00000000000..165c165cf7b --- /dev/null +++ b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/mock/MockExecutionCallback.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * Copyright (c) 2004, 2007 Boeing. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Boeing - initial API and implementation + *******************************************************************************/ +package org.eclipse.osee.executor.admin.mock; + +import org.eclipse.osee.executor.admin.ExecutionCallback; + +/** + * @author Roberto E. Escobar + */ +public class MockExecutionCallback<T> implements ExecutionCallback<T> { + + private boolean wasOnCancelled; + private boolean wasOnSuccess; + private boolean wasOnFailure; + private Throwable throwable; + private T result; + + @Override + public void onCancelled() { + this.wasOnCancelled = true; + } + + @Override + public void onSuccess(T result) { + this.result = result; + this.wasOnSuccess = true; + } + + @Override + public void onFailure(Throwable throwable) { + this.wasOnFailure = true; + this.throwable = throwable; + } + + public boolean wasOnCancelled() { + return wasOnCancelled; + } + + public boolean wasOnSuccess() { + return wasOnSuccess; + } + + public boolean wasOnFailure() { + return wasOnFailure; + } + + public Throwable getThrowable() { + return throwable; + } + + public T getResult() { + return result; + } + +}
\ No newline at end of file diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java new file mode 100644 index 00000000000..ebf1e6657e5 --- /dev/null +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright (c) 2004, 2007 Boeing. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Boeing - initial API and implementation + *******************************************************************************/ +package org.eclipse.osee.executor.admin; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; + +/** + * @author Roberto E. Escobar + */ +public abstract class CancellableCallable<T> implements Callable<T>, HasCancellation { + + private volatile boolean cancelled; + + protected CancellableCallable() { + super(); + cancelled = false; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public void setCancel(boolean isCancelled) { + cancelled = isCancelled; + } + + @Override + public void checkForCancelled() throws CancellationException { + if (isCancelled()) { + throw new CancellationException(); + } + } +} diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/HasCancellation.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/HasCancellation.java new file mode 100644 index 00000000000..b13f69b2f42 --- /dev/null +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/HasCancellation.java @@ -0,0 +1,25 @@ +/******************************************************************************* + * Copyright (c) 2004, 2007 Boeing. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Boeing - initial API and implementation + *******************************************************************************/ +package org.eclipse.osee.executor.admin; + +import java.util.concurrent.CancellationException; + +/** + * @author Roberto E. Escobar + */ +public interface HasCancellation { + + boolean isCancelled(); + + void setCancel(boolean isCancelled); + + void checkForCancelled() throws CancellationException; +} diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/CallableWithCallbackImpl.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/CallableWithCallbackImpl.java index ef03459407b..c44a0759ea0 100644 --- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/CallableWithCallbackImpl.java +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/CallableWithCallbackImpl.java @@ -11,13 +11,15 @@ package org.eclipse.osee.executor.admin.internal; import java.util.concurrent.Callable; +import org.eclipse.osee.executor.admin.CancellableCallable; import org.eclipse.osee.executor.admin.ExecutionCallback; +import org.eclipse.osee.executor.admin.HasCancellation; import org.eclipse.osee.executor.admin.HasExecutionCallback; /** * @author Roberto E. Escobar */ -public class CallableWithCallbackImpl<T> implements Callable<T>, HasExecutionCallback<T> { +public class CallableWithCallbackImpl<T> extends CancellableCallable<T> implements HasExecutionCallback<T> { private final Callable<T> innerWorker; private final ExecutionCallback<T> callback; @@ -29,6 +31,7 @@ public class CallableWithCallbackImpl<T> implements Callable<T>, HasExecutionCal @Override public T call() throws Exception { + checkForCancelled(); return innerWorker.call(); } @@ -36,4 +39,21 @@ public class CallableWithCallbackImpl<T> implements Callable<T>, HasExecutionCal public ExecutionCallback<T> getExecutionCallback() { return callback; } + + @Override + public boolean isCancelled() { + boolean result = super.isCancelled(); + if (innerWorker instanceof HasCancellation) { + result = ((HasCancellation) innerWorker).isCancelled(); + } + return result; + } + + @Override + public void setCancel(boolean isCancelled) { + super.setCancel(isCancelled); + if (innerWorker instanceof HasCancellation) { + ((HasCancellation) innerWorker).setCancel(isCancelled); + } + } } diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java index 301a1277d79..1ace76410d5 100644 --- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java @@ -11,9 +11,8 @@ package org.eclipse.osee.executor.admin.internal; import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,52 +47,6 @@ public class ExecutorServiceImpl extends ThreadPoolExecutor { } @SuppressWarnings("unchecked") - @Override - public <T> Future<T> submit(Callable<T> task) { - Future<T> toReturn = null; - ExecutionCallback<T> callback = getCallBack(task); - if (callback != null) { - FutureTask<T> fTask = new FutureTaskWithCallback<T>(getLogger(), task, callback); - toReturn = (Future<T>) super.submit(fTask); - } else { - toReturn = super.submit(task); - } - return toReturn; - } - - @Override - public <T> Future<T> submit(Runnable task, T result) { - Runnable toRun = task; - ExecutionCallback<T> callback = getCallBack(task); - if (callback != null) { - toRun = new FutureTaskWithCallback<T>(getLogger(), task, result, callback); - } - return super.submit(toRun, result); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public Future<?> submit(Runnable task) { - Runnable toRun = task; - ExecutionCallback<?> callback = getCallBack(task); - if (callback != null) { - toRun = new FutureTaskWithCallback(getLogger(), task, null, callback); - } - return super.submit(toRun); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public void execute(Runnable command) { - Runnable toRun = command; - ExecutionCallback<?> callback = getCallBack(command); - if (callback != null) { - toRun = new FutureTaskWithCallback(getLogger(), command, null, callback); - } - super.execute(toRun); - } - - @SuppressWarnings("unchecked") private <V> ExecutionCallback<V> getCallBack(Object object) { ExecutionCallback<V> callback = null; if (object instanceof HasExecutionCallback) { @@ -102,4 +55,16 @@ public class ExecutorServiceImpl extends ThreadPoolExecutor { } return callback; } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { + ExecutionCallback<T> callback = getCallBack(runnable); + return new FutureTaskWithCallback<T>(getLogger(), runnable, value, callback); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + ExecutionCallback<T> callback = getCallBack(callable); + return new FutureTaskWithCallback<T>(getLogger(), callable, callback); + } } diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java index 34851d77ba5..a1b6f6f1d3d 100644 --- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java @@ -11,32 +11,44 @@ package org.eclipse.osee.executor.admin.internal; import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.eclipse.osee.executor.admin.ExecutionCallback; +import org.eclipse.osee.executor.admin.HasCancellation; import org.eclipse.osee.executor.admin.HasExecutionCallback; import org.eclipse.osee.logger.Log; /** * @author Roberto E. Escobar */ -public class FutureTaskWithCallback<T> extends FutureTask<T> implements HasExecutionCallback<T> { +public class FutureTaskWithCallback<T> implements RunnableFuture<T>, HasExecutionCallback<T> { - private final Runnable runnable; - private final ExecutionCallback<T> callback; private final Log logger; + private final ExecutionCallback<T> callback; + private final Sync sync; + private final Runnable runnable; public FutureTaskWithCallback(Log logger, Callable<T> callable, ExecutionCallback<T> callback) { - super(callable); + if (callable == null) { + throw new NullPointerException(); + } + sync = new Sync(callable); + this.logger = logger; this.callback = callback; this.runnable = null; } public FutureTaskWithCallback(Log logger, Runnable runnable, T result, ExecutionCallback<T> callback) { - super(runnable, result); + this.runnable = runnable; + sync = new Sync(Executors.callable(runnable, result)); this.logger = logger; this.callback = callback; - this.runnable = runnable; } @Override @@ -44,47 +56,278 @@ public class FutureTaskWithCallback<T> extends FutureTask<T> implements HasExecu return callback; } - private String getWorkerName() { - String name; - if (callback != null) { - name = callback.toString(); - } else if (runnable != null) { - name = runnable.toString(); - } else { - name = this.toString(); - } - return name; + @Override + public boolean isCancelled() { + return sync.innerIsCancelled(); + } + + @Override + public boolean isDone() { + return sync.innerIsDone(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return sync.innerCancel(mayInterruptIfRunning); } @Override + public T get() throws InterruptedException, ExecutionException { + return sync.innerGet(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return sync.innerGet(unit.toNanos(timeout)); + } + protected void done() { - super.done(); - try { - callback.onSuccess(get()); - } catch (Throwable ex) { - callback.onCancelled(); - // logger.error(ex, "Error onSuccess callback for - [%s]", getWorkerName()); - } + // Do nothing + } + + protected void set(T v) { + sync.innerSet(v); + } + + protected void setException(Throwable t) { + sync.innerSetException(t); } @Override - protected void setException(Throwable throwable) { - super.setException(throwable); - try { - callback.onFailure(throwable); - } catch (Throwable ex) { - logger.error(ex, "Error onFailure callback for - [%s]", getWorkerName()); + public void run() { + sync.innerRun(); + } + + protected boolean runAndReset() { + return sync.innerRunAndReset(); + } + + protected void notifyOnSuccess(T result) { + ExecutionCallback<T> callback = getExecutionCallback(); + if (callback != null) { + try { + callback.onSuccess(result); + } catch (Throwable th) { + handleNotificationError(th, "success"); + } } } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean result = super.cancel(mayInterruptIfRunning); - try { - callback.onCancelled(); - } catch (Throwable ex) { - logger.error(ex, "Error onCancel callback for - [%s]", getWorkerName()); + protected void notifyOnCancelled() { + ExecutionCallback<T> callback = getExecutionCallback(); + if (callback != null) { + try { + callback.onCancelled(); + } catch (Throwable th) { + handleNotificationError(th, "cancelled"); + } + } + } + + protected void notifyOnFailure(Throwable throwable) { + ExecutionCallback<T> callback = getExecutionCallback(); + if (callback != null) { + try { + callback.onFailure(throwable); + } catch (Throwable th) { + handleNotificationError(th, "failure"); + } } - return result; } + + protected void handleNotificationError(Throwable th, String value) { + logger.error(th, "Error during on %s notification [%s]", value, callback.toString()); + } + + private final class Sync extends AbstractQueuedSynchronizer { + + private static final long serialVersionUID = 592282723700675891L; + + /** State value representing that task is running */ + private static final int RUNNING = 1; + /** State value representing that task ran */ + private static final int RAN = 2; + /** State value representing that task was cancelled */ + private static final int CANCELLED = 4; + + /** The underlying callable */ + private final Callable<T> callable; + /** The result to return from get() */ + private T result; + /** The exception to throw from get() */ + private Throwable exception; + + /** + * The thread running task. When nulled after set/cancel, this indicates that the results are accessible. Must be + * volatile, to ensure visibility upon completion. + */ + private volatile Thread runner; + + Sync(Callable<T> callable) { + this.callable = callable; + } + + private boolean ranOrCancelled(int state) { + return (state & (RAN | CANCELLED)) != 0; + } + + @Override + protected int tryAcquireShared(int ignore) { + return innerIsDone() ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared(int ignore) { + runner = null; + return true; + } + + boolean innerIsCancelled() { + return getState() == CANCELLED; + } + + boolean innerIsDone() { + return ranOrCancelled(getState()) && runner == null; + } + + T innerGet() throws InterruptedException, ExecutionException { + acquireSharedInterruptibly(0); + if (getState() == CANCELLED) { + throw new CancellationException(); + } + if (exception != null) { + throw new ExecutionException(exception); + } + + return result; + } + + T innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { + if (!tryAcquireSharedNanos(0, nanosTimeout)) { + throw new TimeoutException(); + } + if (getState() == CANCELLED) { + throw new CancellationException(); + } + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } + + void innerSet(T v) { + for (;;) { + int s = getState(); + if (s == RAN) { + return; + } + if (s == CANCELLED) { + // aggressively release to set runner to null, + // in case we are racing with a cancel request + // that will try to interrupt runner + releaseShared(0); + notifyOnCancelled(); + return; + } + if (compareAndSetState(s, RAN)) { + result = v; + releaseShared(0); + done(); + notifyOnSuccess(result); + return; + } + } + } + + void innerSetException(Throwable throwable) { + for (;;) { + int s = getState(); + if (s == RAN) { + return; + } + if (s == CANCELLED) { + // aggressively release to set runner to null, + // in case we are racing with a cancel request + // that will try to interrupt runner + releaseShared(0); + notifyOnCancelled(); + return; + } + if (compareAndSetState(s, RAN)) { + exception = throwable; + result = null; + releaseShared(0); + done(); + notifyOnFailure(throwable); + return; + } + } + } + + boolean innerCancel(boolean mayInterruptIfRunning) { + for (;;) { + int s = getState(); + if (ranOrCancelled(s)) { + return false; + } + if (compareAndSetState(s, CANCELLED)) { + break; + } + } + + if (callable instanceof HasCancellation) { + HasCancellation cancellable = (HasCancellation) callable; + cancellable.setCancel(true); + } + if (runnable instanceof HasCancellation) { + HasCancellation cancellable = (HasCancellation) runnable; + cancellable.setCancel(true); + } + + if (mayInterruptIfRunning) { + Thread r = runner; + if (r != null) { + r.interrupt(); + } + } + releaseShared(0); + done(); + notifyOnCancelled(); + return true; + } + + void innerRun() { + if (!compareAndSetState(0, RUNNING)) { + return; + } + try { + runner = Thread.currentThread(); + if (getState() == RUNNING) { + innerSet(callable.call()); + } else { + releaseShared(0); // cancel + } + } catch (Throwable ex) { + innerSetException(ex); + } + } + + boolean innerRunAndReset() { + if (!compareAndSetState(0, RUNNING)) { + return false; + } + try { + runner = Thread.currentThread(); + if (getState() == RUNNING) { + callable.call(); // don't set result + } + runner = null; + return compareAndSetState(RUNNING, 0); + } catch (Throwable ex) { + innerSetException(ex); + return false; + } + } + } + } diff --git a/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/core/IOseeStatement.java b/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/core/IOseeStatement.java index 87ed7ffa5c5..3a0c1d32817 100644 --- a/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/core/IOseeStatement.java +++ b/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/core/IOseeStatement.java @@ -47,6 +47,8 @@ public interface IOseeStatement extends Closeable { @Override void close(); + void cancel() throws OseeCoreException; + InputStream getBinaryStream(String columnName) throws OseeCoreException; InputStream getAsciiStream(String columnName) throws OseeCoreException; diff --git a/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/internal/core/OseeStatementImpl.java b/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/internal/core/OseeStatementImpl.java index bc8ac254fed..a2d48ce2bf8 100644 --- a/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/internal/core/OseeStatementImpl.java +++ b/plugins/org.eclipse.osee.framework.database/src/org/eclipse/osee/framework/database/internal/core/OseeStatementImpl.java @@ -452,4 +452,18 @@ public final class OseeStatementImpl implements IOseeStatement { OseeExceptions.wrapAndThrow(ex); } } + + @Override + public void cancel() throws OseeCoreException { + try { + if (preparedStatement != null) { + preparedStatement.cancel(); + } + if (callableStatement != null) { + callableStatement.cancel(); + } + } catch (SQLException ex) { + OseeExceptions.wrapAndThrow(ex); + } + } }
\ No newline at end of file diff --git a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/ds/DataLoader.java b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/ds/DataLoader.java index 1304b5941e1..ecd3b13167f 100644 --- a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/ds/DataLoader.java +++ b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/ds/DataLoader.java @@ -10,6 +10,7 @@ *******************************************************************************/ package org.eclipse.osee.orcs.core.ds; +import org.eclipse.osee.executor.admin.HasCancellation; import org.eclipse.osee.framework.core.exception.OseeCoreException; /** @@ -17,8 +18,8 @@ import org.eclipse.osee.framework.core.exception.OseeCoreException; */ public interface DataLoader { - void loadArtifacts(ArtifactRowHandler handler, QueryContext queryContext, LoadOptions loadOptions, RelationRowHandlerFactory relationRowHandlerFactory, AttributeRowHandlerFactory attributeRowHandlerFactory) throws OseeCoreException; + void loadArtifacts(HasCancellation cancellation, ArtifactRowHandler handler, QueryContext queryContext, LoadOptions loadOptions, RelationRowHandlerFactory relationRowHandlerFactory, AttributeRowHandlerFactory attributeRowHandlerFactory) throws OseeCoreException; - int countArtifacts(QueryContext queryContext) throws OseeCoreException; + int countArtifacts(HasCancellation cancellation, QueryContext queryContext) throws OseeCoreException; } diff --git a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/OrcsObjectLoader.java b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/OrcsObjectLoader.java index 08171a3531a..5eaf925995b 100644 --- a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/OrcsObjectLoader.java +++ b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/OrcsObjectLoader.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.eclipse.osee.executor.admin.HasCancellation; import org.eclipse.osee.framework.core.exception.OseeCoreException; import org.eclipse.osee.framework.core.model.cache.ArtifactTypeCache; import org.eclipse.osee.framework.core.model.cache.BranchCache; @@ -60,11 +61,11 @@ public class OrcsObjectLoader { this.branchCache = branchCache; } - public int countObjects(QueryContext queryContext) throws OseeCoreException { - return dataLoader.countArtifacts(queryContext); + public int countObjects(HasCancellation cancellation, QueryContext queryContext) throws OseeCoreException { + return dataLoader.countArtifacts(cancellation, queryContext); } - public List<ReadableArtifact> load(QueryContext queryContext, LoadOptions loadOptions, SessionContext sessionContext) throws OseeCoreException { + public List<ReadableArtifact> load(HasCancellation cancellation, QueryContext queryContext, LoadOptions loadOptions, SessionContext sessionContext) throws OseeCoreException { List<ReadableArtifact> artifacts = new ArrayList<ReadableArtifact>(); @@ -73,7 +74,8 @@ public class OrcsObjectLoader { ArtifactRowHandler artifactRowHandler = new ArtifactRowMapper(sessionContext, branchCache, artifactTypeCache, artifactFactory, artifactHandler); - dataLoader.loadArtifacts(artifactRowHandler, queryContext, loadOptions, artifactHandler, artifactHandler); + dataLoader.loadArtifacts(cancellation, artifactRowHandler, queryContext, loadOptions, artifactHandler, + artifactHandler); return artifacts; } diff --git a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCallable.java b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCallable.java index 7785affc689..dc8489c6f31 100644 --- a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCallable.java +++ b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCallable.java @@ -12,7 +12,7 @@ package org.eclipse.osee.orcs.core.internal.search.callable; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import org.eclipse.osee.executor.admin.CancellableCallable; import org.eclipse.osee.framework.core.enums.LoadLevel; import org.eclipse.osee.orcs.core.ds.CriteriaSet; import org.eclipse.osee.orcs.core.ds.LoadOptions; @@ -31,7 +31,7 @@ import org.eclipse.osee.orcs.search.ResultSet; /** * @author Roberto E. Escobar */ -public class SearchCallable implements Callable<ResultSet<ReadableArtifact>> { +public class SearchCallable extends CancellableCallable<ResultSet<ReadableArtifact>> { private final QueryEngine queryEngine; private final OrcsObjectLoader objectLoader; @@ -54,15 +54,18 @@ public class SearchCallable implements Callable<ResultSet<ReadableArtifact>> { public ResultSet<ReadableArtifact> call() throws Exception { QueryContext queryContext = queryEngine.create(sessionContext.getSessionId(), criteriaSet, options); LoadOptions loadOptions = new LoadOptions(options.isHistorical(), options.areDeletedIncluded(), loadLevel); - List<ReadableArtifact> artifacts = objectLoader.load(queryContext, loadOptions, sessionContext); + checkForCancelled(); + List<ReadableArtifact> artifacts = objectLoader.load(this, queryContext, loadOptions, sessionContext); List<ReadableArtifact> results; if (!queryContext.getPostProcessors().isEmpty()) { results = new ArrayList<ReadableArtifact>(); for (QueryPostProcessor processor : queryContext.getPostProcessors()) { processor.setItemsToProcess(artifacts); + checkForCancelled(); List<Match<ReadableArtifact, ReadableAttribute<?>>> matches = processor.call(); for (Match<ReadableArtifact, ReadableAttribute<?>> match : matches) { + checkForCancelled(); results.add(match.getItem()); } } @@ -71,4 +74,5 @@ public class SearchCallable implements Callable<ResultSet<ReadableArtifact>> { } return new SearchResultSet<ReadableArtifact>(results); } + } diff --git a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCountCallable.java b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCountCallable.java index ed176b0973e..afc895f7a25 100644 --- a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCountCallable.java +++ b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchCountCallable.java @@ -12,7 +12,7 @@ package org.eclipse.osee.orcs.core.internal.search.callable; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import org.eclipse.osee.executor.admin.CancellableCallable; import org.eclipse.osee.framework.core.enums.LoadLevel; import org.eclipse.osee.orcs.core.ds.CriteriaSet; import org.eclipse.osee.orcs.core.ds.LoadOptions; @@ -30,7 +30,7 @@ import org.eclipse.osee.orcs.search.Match; /** * @author Roberto E. Escobar */ -public class SearchCountCallable implements Callable<Integer> { +public class SearchCountCallable extends CancellableCallable<Integer> { private final QueryEngine queryEngine; private final OrcsObjectLoader objectLoader; @@ -55,16 +55,20 @@ public class SearchCountCallable implements Callable<Integer> { if (criteriaSet.hasCriteriaType(CriteriaAttributeKeyword.class)) { QueryContext queryContext = queryEngine.create(sessionContext.getSessionId(), criteriaSet, options); LoadOptions loadOptions = new LoadOptions(options.isHistorical(), options.areDeletedIncluded(), loadLevel); - List<ReadableArtifact> artifacts = objectLoader.load(queryContext, loadOptions, sessionContext); + + checkForCancelled(); + List<ReadableArtifact> artifacts = objectLoader.load(this, queryContext, loadOptions, sessionContext); List<ReadableArtifact> results; if (!queryContext.getPostProcessors().isEmpty()) { results = new ArrayList<ReadableArtifact>(); for (QueryPostProcessor processor : queryContext.getPostProcessors()) { processor.setItemsToProcess(artifacts); + checkForCancelled(); List<Match<ReadableArtifact, ReadableAttribute<?>>> matches = processor.call(); for (Match<ReadableArtifact, ReadableAttribute<?>> match : matches) { results.add(match.getItem()); + checkForCancelled(); } } } else { @@ -73,7 +77,8 @@ public class SearchCountCallable implements Callable<Integer> { count = results.size(); } else { QueryContext queryContext = queryEngine.createCount(sessionContext.getSessionId(), criteriaSet, options); - count = objectLoader.countObjects(queryContext); + checkForCancelled(); + count = objectLoader.countObjects(this, queryContext); } return count; } diff --git a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchMatchesCallable.java b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchMatchesCallable.java index 1233e211269..9519d051cc4 100644 --- a/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchMatchesCallable.java +++ b/plugins/org.eclipse.osee.orcs.core/src/org/eclipse/osee/orcs/core/internal/search/callable/SearchMatchesCallable.java @@ -14,7 +14,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; +import org.eclipse.osee.executor.admin.CancellableCallable; import org.eclipse.osee.framework.core.enums.LoadLevel; import org.eclipse.osee.orcs.core.ds.CriteriaSet; import org.eclipse.osee.orcs.core.ds.LoadOptions; @@ -33,7 +33,7 @@ import org.eclipse.osee.orcs.search.ResultSet; /** * @author Roberto E. Escobar */ -public class SearchMatchesCallable implements Callable<ResultSet<Match<ReadableArtifact, ReadableAttribute<?>>>> { +public class SearchMatchesCallable extends CancellableCallable<ResultSet<Match<ReadableArtifact, ReadableAttribute<?>>>> { private final QueryEngine queryEngine; private final OrcsObjectLoader objectLoader; @@ -56,7 +56,8 @@ public class SearchMatchesCallable implements Callable<ResultSet<Match<ReadableA public ResultSet<Match<ReadableArtifact, ReadableAttribute<?>>> call() throws Exception { QueryContext queryContext = queryEngine.create(sessionContext.getSessionId(), criteriaSet, options); LoadOptions loadOptions = new LoadOptions(options.isHistorical(), options.areDeletedIncluded(), loadLevel); - List<ReadableArtifact> artifacts = objectLoader.load(queryContext, loadOptions, sessionContext); + checkForCancelled(); + List<ReadableArtifact> artifacts = objectLoader.load(this, queryContext, loadOptions, sessionContext); List<Match<ReadableArtifact, ReadableAttribute<?>>> results = new ArrayList<Match<ReadableArtifact, ReadableAttribute<?>>>(); @@ -67,6 +68,7 @@ public class SearchMatchesCallable implements Callable<ResultSet<Match<ReadableA } for (QueryPostProcessor processor : processors) { processor.setItemsToProcess(artifacts); + checkForCancelled(); results.addAll(processor.call()); } return new SearchResultSet<Match<ReadableArtifact, ReadableAttribute<?>>>(results); diff --git a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/loader/DataLoaderImpl.java b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/loader/DataLoaderImpl.java index e5614c1caa5..34a10beb3c9 100644 --- a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/loader/DataLoaderImpl.java +++ b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/loader/DataLoaderImpl.java @@ -11,6 +11,8 @@ package org.eclipse.osee.orcs.db.internal.loader; import java.util.List; +import java.util.concurrent.CancellationException; +import org.eclipse.osee.executor.admin.HasCancellation; import org.eclipse.osee.framework.core.enums.LoadLevel; import org.eclipse.osee.framework.core.exception.OseeCoreException; import org.eclipse.osee.framework.core.services.IdentityService; @@ -109,7 +111,7 @@ public class DataLoaderImpl implements DataLoader { } @Override - public int countArtifacts(QueryContext queryContext) throws OseeCoreException { + public int countArtifacts(HasCancellation cancellation, QueryContext queryContext) throws OseeCoreException { SqlContext sqlContext = toSqlContext(queryContext); for (AbstractJoinQuery join : sqlContext.getJoins()) { join.store(); @@ -117,6 +119,8 @@ public class DataLoaderImpl implements DataLoader { String query = sqlContext.getSql(); List<Object> params = sqlContext.getParameters(); try { + checkCancelled(cancellation); + return oseeDatabaseService.runPreparedQueryFetchObject(-1, query, params.toArray()); } finally { for (AbstractJoinQuery join : sqlContext.getJoins()) { @@ -125,21 +129,35 @@ public class DataLoaderImpl implements DataLoader { } } + private void checkCancelled(HasCancellation cancellation) throws CancellationException { + if (cancellation != null) { + cancellation.checkForCancelled(); + } + } + @Override - public void loadArtifacts(ArtifactRowHandler handler, QueryContext queryContext, LoadOptions loadOptions, RelationRowHandlerFactory relationRowHandlerFactory, AttributeRowHandlerFactory attributeRowHandlerFactory) throws OseeCoreException { + public void loadArtifacts(HasCancellation cancellation, ArtifactRowHandler handler, QueryContext queryContext, LoadOptions loadOptions, RelationRowHandlerFactory relationRowHandlerFactory, AttributeRowHandlerFactory attributeRowHandlerFactory) throws OseeCoreException { SqlContext sqlContext = toSqlContext(queryContext); int fetchSize = computeFetchSize(sqlContext); - AbstractJoinQuery join = createArtifactIdJoin(sqlContext, fetchSize); + AbstractJoinQuery join = createArtifactIdJoin(cancellation, sqlContext, fetchSize); try { join.store(); int queryId = join.getQueryId(); + checkCancelled(cancellation); + artifactLoader.loadFromQueryId(handler, loadOptions, fetchSize, queryId); + + checkCancelled(cancellation); + if (isAttributeLoadingAllowed(loadOptions.getLoadLevel())) { AttributeRowHandler attrHandler = attributeRowHandlerFactory.createAttributeRowHandler(); attributeLoader.loadFromQueryId(attrHandler, loadOptions, fetchSize, queryId); } + + checkCancelled(cancellation); + if (isRelationLoadingAllowed(loadOptions.getLoadLevel())) { RelationRowHandler relHandler = relationRowHandlerFactory.createRelationRowHandler(); relationLoader.loadFromQueryId(relHandler, loadOptions, fetchSize, queryId); @@ -171,7 +189,7 @@ public class DataLoaderImpl implements DataLoader { return fetchSize; } - private AbstractJoinQuery createArtifactIdJoin(SqlContext sqlContext, int fetchSize) throws OseeCoreException { + private AbstractJoinQuery createArtifactIdJoin(HasCancellation cancellation, SqlContext sqlContext, int fetchSize) throws OseeCoreException { ArtifactJoinQuery artifactJoin = JoinUtility.createArtifactJoinQuery(oseeDatabaseService); for (AbstractJoinQuery join : sqlContext.getJoins()) { join.store(); @@ -179,6 +197,8 @@ public class DataLoaderImpl implements DataLoader { String query = sqlContext.getSql(); List<Object> params = sqlContext.getParameters(); try { + checkCancelled(cancellation); + Integer transactionId = -1; IOseeStatement chStmt = oseeDatabaseService.getStatement(); try { @@ -190,6 +210,8 @@ public class DataLoaderImpl implements DataLoader { transactionId = chStmt.getInt("transaction_id"); } artifactJoin.add(artId, branchId, transactionId); + + checkCancelled(cancellation); } } finally { chStmt.close(); @@ -201,5 +223,4 @@ public class DataLoaderImpl implements DataLoader { } return artifactJoin; } - } |