Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPawel Piech2010-10-18 16:50:06 +0000
committerPawel Piech2010-10-18 16:50:06 +0000
commitc6e0fac759022c8f0a9d138e11b3c87971325ab5 (patch)
treedffb56adbce52d688bc94a9a91fc56fb50106fa1
parent400a4127fdd2fab3aac96c07f279b3f1492c6497 (diff)
downloadorg.eclipse.cdt-c6e0fac759022c8f0a9d138e11b3c87971325ab5.tar.gz
org.eclipse.cdt-c6e0fac759022c8f0a9d138e11b3c87971325ab5.tar.xz
org.eclipse.cdt-c6e0fac759022c8f0a9d138e11b3c87971325ab5.zip
Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM) utilities for DSF
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java324
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java69
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java44
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java18
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java15
-rw-r--r--dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java721
-rw-r--r--dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java178
7 files changed, 1353 insertions, 16 deletions
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java
new file mode 100644
index 00000000000..f3b512f04f6
--- /dev/null
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java
@@ -0,0 +1,324 @@
+package org.eclipse.cdt.dsf.concurrent;
+
+/*******************************************************************************
+ * Copyright (c) 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.eclipse.cdt.dsf.internal.DsfPlugin;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+
+/**
+ * A base implementation of a general purpose cache. Sub classes must implement
+ * {@link #retrieve(DataRequestMonitor)} to fetch data from the data source.
+ * Sub-classes or clients are also responsible for calling {@link #disable()}
+ * and {@link #reset()} to manage the state of the cache in response to events
+ * from the data source.
+ * <p>
+ * This cache requires an executor to use. The executor is used to synchronize
+ * access to the cache state and data.
+ * </p>
+ * @since 2.2
+ */
+@ConfinedToDsfExecutor("fExecutor")
+public abstract class AbstractCache<V> implements ICache<V> {
+
+ private static final IStatus INVALID_STATUS = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache invalid", null); //$NON-NLS-1$
+ private static final IStatus DISABLED_STATUS = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache disabled", null); //$NON-NLS-1$
+
+ private class RequestCanceledListener implements RequestMonitor.ICanceledListener {
+ public void requestCanceled(final RequestMonitor canceledRm) {
+ fExecutor.execute(new Runnable() {
+ public void run() {
+ handleCanceledRm(canceledRm);
+ }
+ });
+ }
+ };
+
+ private RequestCanceledListener fRequestCanceledListener = new RequestCanceledListener();
+
+ private boolean fValid;
+
+ private V fData;
+ private IStatus fStatus = INVALID_STATUS;
+
+ @ThreadSafe
+ private Object fWaitingList;
+
+ private final ImmediateInDsfExecutor fExecutor;
+
+ public AbstractCache(ImmediateInDsfExecutor executor) {
+ fExecutor = executor;
+ }
+
+ public DsfExecutor getExecutor() {
+ return fExecutor.getDsfExecutor();
+ }
+
+ protected ImmediateInDsfExecutor getImmediateInDsfExecutor() {
+ return fExecutor;
+ }
+
+ /**
+ * Sub-classes should override this method to retrieve the cache data
+ * from its source.
+ *
+ * @param rm Request monitor for completion of data retrieval.
+ */
+ abstract protected void retrieve();
+
+
+ /**
+ * Called while holding a lock to "this". No new request will start until
+ * this call returns.
+ */
+ @ThreadSafe
+ abstract protected void canceled();
+
+ public boolean isValid() {
+ return fValid;
+ }
+
+ public V getData() {
+ return fData;
+ }
+
+ public IStatus getStatus() {
+ return fStatus;
+ }
+
+ public void request(final DataRequestMonitor<V> rm) {
+ wait(rm);
+ }
+
+ public void wait(RequestMonitor rm) {
+ assert fExecutor.getDsfExecutor().isInExecutorThread();
+
+ if (!fValid) {
+ boolean first = false;
+ synchronized (this) {
+ if (fWaitingList == null) {
+ first = true;
+ fWaitingList = rm;
+ } else if (fWaitingList instanceof RequestMonitor[]) {
+ RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList;
+ int waitingListLength = waitingList.length;
+ int i;
+ for (i = 0; i < waitingListLength; i++) {
+ if (waitingList[i] == null) {
+ waitingList[i] = rm;
+ break;
+ }
+ }
+ if (i == waitingListLength) {
+ RequestMonitor[] newWaitingList = new RequestMonitor[waitingListLength + 1];
+ System.arraycopy(waitingList, 0, newWaitingList, 0, waitingListLength);
+ newWaitingList[waitingListLength] = rm;
+ fWaitingList = newWaitingList;
+ }
+ } else {
+ RequestMonitor[] newWaitingList = new RequestMonitor[2];
+ newWaitingList[0] = (RequestMonitor)fWaitingList;
+ newWaitingList[1] = rm;
+ fWaitingList = newWaitingList;
+ }
+ }
+ rm.addCancelListener(fRequestCanceledListener);
+ if (first) {
+ retrieve();
+ }
+ } else {
+ if (rm instanceof DataRequestMonitor<?>) {
+ @SuppressWarnings("unchecked")
+ DataRequestMonitor<V> drm = (DataRequestMonitor<V>)rm;
+ drm.setData(fData);
+ }
+ rm.setStatus(fStatus);
+ rm.done();
+ }
+ }
+
+ private void doSet(V data, IStatus status, boolean valid) {
+ assert fExecutor.getDsfExecutor().isInExecutorThread();
+
+ fData = data;
+ fStatus = status;
+ fValid = valid;
+
+ Object waiting = null;
+ synchronized(this) {
+ waiting = fWaitingList;
+ fWaitingList = null;
+ }
+ if (waiting != null) {
+ if (waiting instanceof RequestMonitor) {
+ completeWaitingRm((RequestMonitor)waiting);
+ } else if (waiting instanceof RequestMonitor[]) {
+ RequestMonitor[] waitingList = (RequestMonitor[])waiting;
+ for (int i = 0; i < waitingList.length; i++) {
+ if (waitingList[i] != null) {
+ completeWaitingRm(waitingList[i]);
+ }
+ }
+ }
+ waiting = null;
+ }
+ }
+
+ private void completeWaitingRm(RequestMonitor rm) {
+ if (rm instanceof DataRequestMonitor<?>) {
+ @SuppressWarnings("unchecked")
+ DataRequestMonitor<V> drm = (DataRequestMonitor<V>)rm;
+ drm.setData(fData);
+ }
+ rm.setStatus(fStatus);
+ rm.removeCancelListener(fRequestCanceledListener);
+ rm.done();
+ }
+
+ private void handleCanceledRm(final RequestMonitor rm) {
+
+ boolean found = false;
+ boolean waiting = false;
+ synchronized (this) {
+ if (rm.equals(fWaitingList)) {
+ found = true;
+ waiting = false;
+ fWaitingList = null;
+ } else if(fWaitingList instanceof RequestMonitor[]) {
+ RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList;
+ for (int i = 0; i < waitingList.length; i++) {
+ if (!found && rm.equals(waitingList[i])) {
+ waitingList[i] = null;
+ found = true;
+ }
+ waiting = waiting || waitingList[i] != null;
+ }
+ }
+ if (/*found && */!waiting) {
+ canceled();
+ }
+ }
+
+ // If we have no clients waiting anymore, cancel the request
+ if (found) {
+ // We no longer need to listen to cancelations.
+ rm.removeCancelListener(fRequestCanceledListener);
+ rm.setStatus(Status.CANCEL_STATUS);
+ rm.done();
+ }
+
+ }
+
+ @ThreadSafe
+ protected boolean isCanceled() {
+ boolean canceled;
+ List<RequestMonitor> canceledRms = null;
+ synchronized (this) {
+ if (fWaitingList instanceof RequestMonitor && ((RequestMonitor)fWaitingList).isCanceled()) {
+ canceledRms = new ArrayList<RequestMonitor>(1);
+ canceledRms.add((RequestMonitor)fWaitingList);
+ fWaitingList = null;
+ } else if(fWaitingList instanceof RequestMonitor[]) {
+ boolean waiting = false;
+ RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList;
+ for (int i = 0; i < waitingList.length; i++) {
+ if (waitingList[i] != null && waitingList[i].isCanceled()) {
+ if (canceledRms == null) {
+ canceledRms = new ArrayList<RequestMonitor>(1);
+ }
+ canceledRms.add( waitingList[i] );
+ waitingList[i] = null;
+ }
+ waiting = waiting || waitingList[i] != null;
+ }
+ if (!waiting) {
+ fWaitingList = null;
+ }
+ }
+ canceled = fWaitingList == null;
+ }
+ if (canceledRms != null) {
+ for (RequestMonitor canceledRm : canceledRms) {
+ canceledRm.setStatus(Status.CANCEL_STATUS);
+ canceledRm.removeCancelListener(fRequestCanceledListener);
+ canceledRm.done();
+ }
+ }
+
+ return canceled;
+ }
+
+ /**
+ * Resets the cache with a data value <code>null</code> and an error
+ * status with code {@link IDsfStatusConstants#INVALID_STATE}.
+ *
+ * @see #reset(Object, IStatus)
+ */
+ protected void reset() {
+ reset(null, INVALID_STATUS);
+ }
+
+ /**
+ * Resets the cache with given data and status. Resetting the cache
+ * forces the cache to be invalid and cancels any current pending requests
+ * from data source.
+ * <p>
+ * This method should be called when the data source has issued an event
+ * indicating that the source data has changed but data may still be
+ * retrieved. Clients may need to re-request data following cache reset.
+ * </p>
+ * @param data The data that should be returned to any clients currently
+ * waiting for cache data.
+ * @status The status that should be returned to any clients currently
+ * waiting for cache data.
+ */
+ protected void reset(V data, IStatus status) {
+ doSet(data, status, false);
+ }
+
+ /**
+ * Disables the cache from retrieving data from the source. If the cache
+ * is already valid the data and status is retained. If the cache is not
+ * valid, then data value <code>null</code> and an error status with code
+ * {@link IDsfStatusConstants#INVALID_STATE} are set.
+ *
+ * @see #set(Object, IStatus)
+ */
+ protected void disable() {
+ if (!fValid) {
+ set(null, DISABLED_STATUS);
+ }
+ }
+
+ /**
+ * Resets the cache then disables it. When a cache is disabled it means
+ * that it is valid and requests to the data source will not be sent.
+ * <p>
+ * This method should be called when the data source has issued an event
+ * indicating that the source data has changed and future requests for
+ * data will return the given data and status. Once the source data
+ * becomes available again, clients should call {@link #reset()}.
+ * </p>
+ * @param data The data that should be returned to any clients waiting for
+ * cache data and for clients requesting data until the cache is reset again.
+ * @status The status that should be returned to any clients waiting for
+ * cache data and for clients requesting data until the cache is reset again.
+ *
+ * @see #reset(Object, IStatus)
+ */
+ protected void set(V data, IStatus status) {
+ doSet(data, status, true);
+ }
+}
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java
new file mode 100644
index 00000000000..a92d45103ad
--- /dev/null
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Copyright (c) 2010 Wind River Systems and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.cdt.dsf.concurrent;
+
+import org.eclipse.core.runtime.IStatus;
+
+/**
+ * The interface for a general purpose cache that caches the result of a single
+ * request. Implementations need to provide the logic to fetch data from an
+ * asynchronous data source.
+ * <p>
+ * This cache requires an executor to use. The executor is used to synchronize
+ * access to the cache state and data.
+ * </p>
+ * @since 2.2
+ */
+@ConfinedToDsfExecutor("getExecutor()")
+public interface ICache<V> {
+
+ /**
+ * The executor that must be used to access this cache.
+ */
+ public DsfExecutor getExecutor();
+
+ /**
+ * Returns the current data value held by this cache. Clients should first
+ * call isValid() to determine if the data is up to date.
+ */
+ public V getData();
+
+ /**
+ * Returns the status of the source request held by this cache. Clients
+ * should first call isValid() to determine if the data is up to date.
+ */
+ public IStatus getStatus();
+
+ /**
+ * Wait for the cache to become valid. If the cache is valid already, the
+ * request returns immediately, otherwise data will first be retrieved from the
+ * source.
+ *
+ * @param rm RequestMonitor that is called when cache becomes valid.
+ */
+ public void wait(RequestMonitor rm);
+
+ /**
+ * Request data from the cache. The cache is valid, it will complete the
+ * request immediately, otherwise data will first be retrieved from the
+ * source.
+ *
+ * @param rm RequestMonitor that is called when cache becomes valid.
+ */
+ public void request(DataRequestMonitor<V> rm);
+
+ /**
+ * Returns <code>true</code> if the cache is currently valid. I.e.
+ * whether the cache can return a value immediately without first
+ * retrieving it from the data source.
+ */
+ public boolean isValid();
+}
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java
new file mode 100644
index 00000000000..55706dc4f9d
--- /dev/null
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Copyright (c) 2008 Wind River Systems and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.cdt.dsf.concurrent;
+
+import java.util.concurrent.Executor;
+
+
+/**
+ * @since 2.2
+ *
+ */
+public class ImmediateInDsfExecutor implements Executor {
+
+ final private DsfExecutor fDsfExecutor;
+
+ public DsfExecutor getDsfExecutor() {
+ return fDsfExecutor;
+ }
+
+ public ImmediateInDsfExecutor(DsfExecutor dsfExecutor) {
+ fDsfExecutor = dsfExecutor;
+ }
+
+ public void execute(final Runnable command) {
+ if (fDsfExecutor.isInExecutorThread()) {
+ command.run();
+ } else {
+ fDsfExecutor.execute(new DsfRunnable() {
+ public void run() {
+ command.run();
+ }
+ });
+ }
+ }
+
+}
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java
index 634716b2e34..1c52b3099eb 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java
@@ -24,7 +24,7 @@ import org.eclipse.core.runtime.Status;
* This cache requires an executor to use. The executor is used to synchronize
* access to the cache state and data.
* </p>
- * @since 2.1
+ * @since 2.2
*/
@ConfinedToDsfExecutor("fExecutor")
public abstract class RequestCache<V> extends AbstractCache<V> {
@@ -36,21 +36,15 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
super(executor);
}
- /**
- * Sub-classes should override this method to retrieve the cache data
- * from its source.
- *
- * @param rm Request monitor for completion of data retrieval.
- */
@Override
- protected void retrieve() {
+ protected final void retrieve() {
// Make sure to cancel the previous rm. This may lead to the rm being
// canceled twice, but that's not harmful.
if (fRm != null) {
fRm.cancel();
}
- fRm = new DataRequestMonitor<V>(getExecutor(), null) {
+ fRm = new DataRequestMonitor<V>(getImmediateInDsfExecutor(), null) {
private IStatus fRawStatus = Status.OK_STATUS;
@@ -95,7 +89,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
@Override
- public void reset(V data, IStatus status) {
+ protected void reset(V data, IStatus status) {
if (fRm != null) {
fRm.cancel();
fRm = null;
@@ -104,7 +98,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
@Override
- public void disable() {
+ protected void disable() {
if (fRm != null) {
fRm.cancel();
fRm = null;
@@ -113,7 +107,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
@Override
- public void set(V data, IStatus status) {
+ protected void set(V data, IStatus status) {
if (fRm != null) {
fRm.cancel();
fRm = null;
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java
index 8de8d5ae07f..58ed79cbfe8 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java
@@ -122,7 +122,7 @@ public abstract class Transaction<V> {
* @throws CoreException
* if an error was encountered getting the data from the source
*/
- protected void validate(RequestCache<?> cache) throws InvalidCacheException, CoreException {
+ protected void validate(ICache<?> cache) throws InvalidCacheException, CoreException {
if (cache.isValid()) {
if (!cache.getStatus().isOK()) {
throw new CoreException(cache.getStatus());
@@ -141,15 +141,22 @@ public abstract class Transaction<V> {
}
}
+ /**
+ * See {@link #validate(RequestCache)}. This variant simply validates
+ * multiple cache objects.
+ */
+ protected void validate(RequestCache<?> ... caches) throws InvalidCacheException, CoreException {
+ }
+
/**
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
- protected void validate(RequestCache<?> ... caches) throws InvalidCacheException, CoreException {
+ protected void validate(Iterable<ICache<?>> caches) throws InvalidCacheException, CoreException {
// Check if any of the caches have errors:
boolean allValid = true;
- for (RequestCache<?> cache : caches) {
+ for (ICache<?> cache : caches) {
if (cache.isValid()) {
if (!cache.getStatus().isOK()) {
throw new CoreException(cache.getStatus());
@@ -169,7 +176,7 @@ public abstract class Transaction<V> {
}
};
int count = 0;
- for (RequestCache<?> cache : caches) {
+ for (ICache<?> cache : caches) {
if (!cache.isValid()) {
cache.wait(countringRm);
count++;
diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java
new file mode 100644
index 00000000000..b6455fd62dd
--- /dev/null
+++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java
@@ -0,0 +1,721 @@
+/*******************************************************************************
+ * Copyright (c) 2006 Wind River Systems and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.cdt.tests.dsf.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+
+import junit.framework.Assert;
+
+import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
+import org.eclipse.cdt.dsf.concurrent.DsfRunnable;
+import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants;
+import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor;
+import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor;
+import org.eclipse.cdt.dsf.concurrent.Query;
+import org.eclipse.cdt.dsf.concurrent.RequestCache;
+import org.eclipse.cdt.tests.dsf.TestDsfExecutor;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that exercise the DataCache object.
+ */
+public class CacheTests {
+
+ TestDsfExecutor fExecutor;
+ TestCache fTestCache;
+ DataRequestMonitor<Integer> fRetrieveRm;
+
+ class TestCache extends RequestCache<Integer> {
+
+ public TestCache() {
+ super(new ImmediateInDsfExecutor(fExecutor));
+ }
+
+ @Override
+ protected void retrieve(DataRequestMonitor<Integer> rm) {
+ synchronized(CacheTests.this) {
+ fRetrieveRm = rm;
+ CacheTests.this.notifyAll();
+ }
+ }
+
+ @Override
+ protected void reset() {
+ super.reset();
+ }
+
+ @Override
+ public void reset(Integer data, IStatus status) {
+ super.reset(data, status);
+ }
+
+ @Override
+ public void disable() {
+ super.disable();
+ }
+
+ @Override
+ public void set(Integer data, IStatus status) {
+ super.set(data, status);
+ }
+
+ }
+
+ /**
+ * There's no rule on how quickly the cache has to start data retrieval
+ * after it has been requested. It could do it immediately, or it could
+ * wait a dispatch cycle, etc..
+ */
+ private void waitForRetrieveRm() {
+ synchronized(this) {
+ while (fRetrieveRm == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+
+ @Before
+ public void startExecutor() throws ExecutionException, InterruptedException {
+ fExecutor = new TestDsfExecutor();
+ fTestCache = new TestCache();
+ }
+
+ @After
+ public void shutdownExecutor() throws ExecutionException, InterruptedException {
+ fExecutor.submit(new DsfRunnable() { public void run() {
+ fExecutor.shutdown();
+ }}).get();
+ if (fExecutor.exceptionsCaught()) {
+ Throwable[] exceptions = fExecutor.getExceptions();
+ throw new ExecutionException(exceptions[0]);
+ }
+ fRetrieveRm = null;
+ fTestCache = null;
+ fExecutor = null;
+ }
+
+ private void assertCacheValidWithData(Object data) {
+ Assert.assertTrue(fTestCache.isValid());
+ Assert.assertEquals(data, fTestCache.getData());
+ Assert.assertTrue(fTestCache.getStatus().isOK());
+ }
+
+ private void assertCacheResetWithoutData() {
+ Assert.assertFalse(fTestCache.isValid());
+ Assert.assertEquals(null, fTestCache.getData());
+ Assert.assertFalse(fTestCache.getStatus().isOK());
+ Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
+ }
+
+ private void assertCacheDisabledWithoutData() {
+ Assert.assertTrue(fTestCache.isValid());
+ Assert.assertEquals(null, fTestCache.getData());
+ Assert.assertFalse(fTestCache.getStatus().isOK());
+ Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
+ }
+
+ private void assertCacheWaiting() {
+ Assert.assertFalse(fTestCache.isValid());
+ Assert.assertEquals(null, fTestCache.getData());
+ Assert.assertFalse(fTestCache.getStatus().isOK());
+ Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
+ Assert.assertFalse(fRetrieveRm.isCanceled());
+ }
+
+ private void assertCacheCanceled() {
+ Assert.assertFalse(fTestCache.isValid());
+ Assert.assertEquals(null, fTestCache.getData());
+ Assert.assertFalse(fTestCache.getStatus().isOK());
+ Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
+ Assert.assertTrue(fRetrieveRm.isCanceled());
+ }
+
+ @Test
+ public void getWithCompletionInDsfThreadTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ // Check initial state
+ Assert.assertFalse(fTestCache.isValid());
+ Assert.assertFalse(fTestCache.getStatus().isOK());
+ Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE);
+
+ fExecutor.execute(q);
+
+ // Wait until the cache requests the data.
+ waitForRetrieveRm();
+
+ // Check state while waiting for data
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Complete the cache's retrieve data request.
+ fExecutor.submit(new Callable<Object>() { public Object call() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ // Check that the data is available in the cache immediately
+ // (in the same dispatch cycle).
+ Assert.assertEquals(1, (int)fTestCache.getData());
+ Assert.assertTrue(fTestCache.isValid());
+
+ return null;
+ }}).get();
+
+ Assert.assertEquals(1, (int)q.get());
+
+ // Re-check final state
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void getTest() throws InterruptedException, ExecutionException {
+ // Check initial state
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Check state while waiting for data
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Set the data without using an executor.
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ Assert.assertEquals(1, (int)q.get());
+
+ // Check final state
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void getTestWithTwoClients() throws InterruptedException, ExecutionException {
+ // Check initial state
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Request data from cache
+ Query<Integer> q1 = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q1);
+
+ // Request data from cache again
+ Query<Integer> q2 = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q2);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Check state while waiting for data
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Set the data without using an executor.
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ Assert.assertEquals(1, (int)q1.get());
+ Assert.assertEquals(1, (int)q2.get());
+
+ // Check final state
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void getTestWithManyClients() throws InterruptedException, ExecutionException {
+ // Check initial state
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Request data from cache
+ List<Query<Integer>> qList = new ArrayList<Query<Integer>>();
+ for (int i = 0; i < 10; i++) {
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+ qList.add(q);
+ }
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Check state while waiting for data
+ Assert.assertFalse(fTestCache.isValid());
+
+ // Set the data without using an executor.
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ for (Query<Integer> q : qList) {
+ Assert.assertEquals(1, (int)q.get());
+ }
+
+ // Check final state
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void disableBeforeRequestTest() throws InterruptedException, ExecutionException {
+ // Disable the cache with a given value
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.disable();
+ }
+ }).get();
+
+ assertCacheDisabledWithoutData();
+
+ // Try to request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ Thread.sleep(100);
+
+ // Retrieval should never have been made.
+ Assert.assertEquals(null, fRetrieveRm);
+
+ try {
+ Assert.assertEquals(null, q.get());
+ } catch (ExecutionException e) {
+ // expected the exception
+ return;
+ }
+ Assert.fail("expected an exeption");
+ }
+
+ @Test
+ public void disableWhilePendingTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Disable the cache with a given value
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.disable();
+ }
+ }).get();
+
+ assertCacheDisabledWithoutData();
+
+ // Completed the retrieve RM
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+ }
+ }).get();
+
+ // Validate that cache is still disabled without data.
+ assertCacheDisabledWithoutData();
+ }
+
+ @Test
+ public void disableWhileValidTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Complete the request
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ q.get();
+
+ // Disable cache
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.disable();
+ }
+ }).get();
+
+ // Check final state
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void disableWithValueTest() throws InterruptedException, ExecutionException {
+ // Disable the cache with a given value
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.set(2, Status.OK_STATUS);
+ }
+ }).get();
+
+ // Validate that cache is disabled without data.
+ assertCacheValidWithData(2);
+ }
+
+
+ @Test
+ public void resetBeforeRequestTest() throws InterruptedException, ExecutionException {
+ // Disable the cache with a given value
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.reset();
+ }
+ }).get();
+
+ assertCacheResetWithoutData();
+
+ // Try to request data from cache (check that cache still works normally)
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Complete the request
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ // Check result
+ Assert.assertEquals(1, (int)q.get());
+
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void resetWhilePendingTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Disable the cache with a given value
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.reset();
+ }
+ }).get();
+
+ assertCacheResetWithoutData();
+
+ // Completed the retrieve RM
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+ }
+ }).get();
+
+ // Validate that cache is still disabled without data.
+ assertCacheResetWithoutData();
+ }
+
+ @Test
+ public void cancelWhilePendingTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Cancel the client request
+ q.cancel(true);
+ try {
+ q.get();
+ Assert.fail("Expected a cancellation exception");
+ } catch (CancellationException e) {} // Expected exception;
+
+ assertCacheCanceled();
+
+ // Completed the retrieve RM
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+ }
+ }).get();
+
+ // Validate that cache accepts the canceled request data
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void cancelWhilePendingWithoutClientNotificationTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(new DataRequestMonitor<Integer>(ImmediateExecutor.getInstance(), rm) {
+ @Override
+ public synchronized void addCancelListener(ICanceledListener listener) {
+ // Do not add the cancel listener so that the cancel request is not
+ // propagated to the cache.
+ }
+ });
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Cancel the client request
+ q.cancel(true);
+
+ assertCacheCanceled();
+
+ try {
+ q.get();
+ Assert.fail("Expected a cancellation exception");
+ } catch (CancellationException e) {} // Expected exception;
+
+ // Completed the retrieve RM
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+ }
+ }).get();
+
+ // Validate that cache accepts the canceled request data
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void cancelWhilePendingWithTwoClientsTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q1 = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q1);
+
+ // Request data from cache again
+ Query<Integer> q2 = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q2);
+
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Cancel the first client request
+ q1.cancel(true);
+ try {
+ q1.get();
+ Assert.fail("Expected a cancellation exception");
+ } catch (CancellationException e) {} // Expected exception;
+ assertCacheWaiting();
+
+ // Cancel the second request
+ q2.cancel(true);
+ try {
+ q2.get();
+ Assert.fail("Expected a cancellation exception");
+ } catch (CancellationException e) {} // Expected exception;
+
+ assertCacheCanceled();
+
+ // Completed the retrieve RM
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+ }
+ }).get();
+
+ // Validate that cache accepts the canceled request data
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void cancelWhilePendingWithManyClientsTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ List<Query<Integer>> qList = new ArrayList<Query<Integer>>();
+ for (int i = 0; i < 10; i++) {
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+ qList.add(q);
+ }
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Cancel some client requests
+ int[] toCancel = new int[] { 0, 2, 5, 9};
+ for (int i = 0; i < toCancel.length; i++) {
+
+ // Cancel request and verify that its canceled
+ Query<Integer> q = qList.get(toCancel[i]);
+ q.cancel(true);
+ try {
+ q.get();
+ Assert.fail("Expected a cancellation exception");
+ } catch (CancellationException e) {} // Expected exception;
+ qList.set(toCancel[i], null);
+
+ assertCacheWaiting();
+ }
+
+ // Replace canceled requests with new ones
+ for (int i = 0; i < toCancel.length; i++) {
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+ qList.set(toCancel[i], q);
+ assertCacheWaiting();
+ }
+
+ // Now cancel all requests
+ for (int i = 0; i < (qList.size() - 1); i++) {
+ // Validate that cache is still waiting and is not canceled
+ assertCacheWaiting();
+ qList.get(i).cancel(true);
+ }
+ qList.get(qList.size() - 1).cancel(true);
+ assertCacheCanceled();
+
+ // Completed the retrieve RM
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+ }
+ }).get();
+
+ // Validate that cache accepts the canceled request data
+ assertCacheValidWithData(1);
+ }
+
+ @Test
+ public void resetWhileValidTest() throws InterruptedException, ExecutionException {
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ fTestCache.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm();
+
+ // Complete the request
+ fRetrieveRm.setData(1);
+ fRetrieveRm.done();
+
+ q.get();
+
+ // Disable cache
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.reset();
+ }
+ }).get();
+
+ // Check final state
+ assertCacheResetWithoutData();
+ }
+
+ @Test
+ public void resetWithValueTest() throws InterruptedException, ExecutionException {
+ // Disable the cache with a given value
+ fExecutor.submit(new DsfRunnable() {
+ public void run() {
+ fTestCache.reset(2, Status.OK_STATUS);
+ }
+ }).get();
+
+ // Validate that cache is disabled without data.
+ Assert.assertFalse(fTestCache.isValid());
+ Assert.assertEquals(2, (int)fTestCache.getData());
+ Assert.assertTrue(fTestCache.getStatus().isOK());
+ }
+}
diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java
new file mode 100644
index 00000000000..1b53c51bc30
--- /dev/null
+++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java
@@ -0,0 +1,178 @@
+/*******************************************************************************
+ * Copyright (c) 2006 Wind River Systems and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.cdt.tests.dsf.concurrent;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
+import junit.framework.Assert;
+
+import org.eclipse.cdt.dsf.concurrent.RequestCache;
+import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
+import org.eclipse.cdt.dsf.concurrent.DsfRunnable;
+import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor;
+import org.eclipse.cdt.dsf.concurrent.Query;
+import org.eclipse.cdt.dsf.concurrent.Transaction;
+import org.eclipse.cdt.tests.dsf.TestDsfExecutor;
+import org.eclipse.core.runtime.CoreException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that exercise the Transaction object.
+ */
+public class TransactionTests {
+ final static private int NUM_CACHES = 5;
+
+ TestDsfExecutor fExecutor;
+ TestCache[] fTestCaches = new TestCache[NUM_CACHES];
+ DataRequestMonitor<?>[] fRetrieveRms = new DataRequestMonitor<?>[NUM_CACHES];
+
+ class TestCache extends RequestCache<Integer> {
+
+ final private int fIndex;
+
+ public TestCache(int index) {
+ super(new ImmediateInDsfExecutor(fExecutor));
+ fIndex = index;
+ }
+
+ @Override
+ protected void retrieve(DataRequestMonitor<Integer> rm) {
+ synchronized(TransactionTests.this) {
+ fRetrieveRms[fIndex] = rm;
+ TransactionTests.this.notifyAll();
+ }
+ }
+
+ }
+
+ class TestSingleTransaction extends Transaction<Integer> {
+
+ @Override
+ protected Integer process() throws InvalidCacheException, CoreException {
+ validate(fTestCaches[0]);
+ return fTestCaches[0].getData();
+ }
+ }
+
+ class TestSumTransaction extends Transaction<Integer> {
+ @Override
+ protected Integer process() throws InvalidCacheException, CoreException {
+ validate(fTestCaches);
+
+ int sum = 0;
+ for (RequestCache<Integer> cache : fTestCaches) {
+ sum += cache.getData();
+ }
+ return sum;
+ }
+ }
+
+ /**
+ * There's no rule on how quickly the cache has to start data retrieval
+ * after it has been requested. It could do it immediately, or it could
+ * wait a dispatch cycle, etc..
+ */
+ private void waitForRetrieveRm(boolean all) {
+ synchronized(this) {
+ if (all) {
+ while (Arrays.asList(fRetrieveRms).contains(null)) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ } else {
+ while (fRetrieveRms[0] == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Before
+ public void startExecutor() throws ExecutionException, InterruptedException {
+ fExecutor = new TestDsfExecutor();
+ for (int i = 0; i < fTestCaches.length; i++) {
+ fTestCaches[i] = new TestCache(i);
+ }
+ }
+
+ @After
+ public void shutdownExecutor() throws ExecutionException, InterruptedException {
+ fExecutor.submit(new DsfRunnable() { public void run() {
+ fExecutor.shutdown();
+ }}).get();
+ if (fExecutor.exceptionsCaught()) {
+ Throwable[] exceptions = fExecutor.getExceptions();
+ throw new ExecutionException(exceptions[0]);
+ }
+ fRetrieveRms = new DataRequestMonitor<?>[NUM_CACHES];
+ fTestCaches = new TestCache[NUM_CACHES];
+ fExecutor = null;
+ }
+
+ @Test
+ public void singleTransactionTest() throws InterruptedException, ExecutionException {
+ final TestSingleTransaction testTransaction = new TestSingleTransaction();
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ testTransaction.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm(false);
+
+ // Set the data without using an executor.
+ ((DataRequestMonitor<Integer>)fRetrieveRms[0]).setData(1);
+ fRetrieveRms[0].done();
+
+ Assert.assertEquals(1, (int)q.get());
+ }
+
+ @Test
+ public void sumTransactionTest() throws InterruptedException, ExecutionException {
+
+ final TestSumTransaction testTransaction = new TestSumTransaction();
+ // Request data from cache
+ Query<Integer> q = new Query<Integer>() {
+ @Override
+ protected void execute(DataRequestMonitor<Integer> rm) {
+ testTransaction.request(rm);
+ }
+ };
+ fExecutor.execute(q);
+
+ // Wait until the cache starts data retrieval.
+ waitForRetrieveRm(true);
+
+ // Set the data without using an executor.
+ for (DataRequestMonitor<?> rm : fRetrieveRms) {
+ ((DataRequestMonitor<Integer>)rm).setData(1);
+ rm.done();
+ }
+
+ fExecutor.execute(q);
+ Assert.assertEquals(NUM_CACHES, (int)q.get());
+ }
+
+}

Back to the top