diff options
author | Pawel Piech | 2010-10-18 16:50:06 +0000 |
---|---|---|
committer | Pawel Piech | 2010-10-18 16:50:06 +0000 |
commit | c6e0fac759022c8f0a9d138e11b3c87971325ab5 (patch) | |
tree | dffb56adbce52d688bc94a9a91fc56fb50106fa1 | |
parent | 400a4127fdd2fab3aac96c07f279b3f1492c6497 (diff) | |
download | org.eclipse.cdt-c6e0fac759022c8f0a9d138e11b3c87971325ab5.tar.gz org.eclipse.cdt-c6e0fac759022c8f0a9d138e11b3c87971325ab5.tar.xz org.eclipse.cdt-c6e0fac759022c8f0a9d138e11b3c87971325ab5.zip |
Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM) utilities for DSF
7 files changed, 1353 insertions, 16 deletions
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java new file mode 100644 index 00000000000..f3b512f04f6 --- /dev/null +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java @@ -0,0 +1,324 @@ +package org.eclipse.cdt.dsf.concurrent; + +/******************************************************************************* + * Copyright (c) 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ + +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.cdt.dsf.internal.DsfPlugin; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; + +/** + * A base implementation of a general purpose cache. Sub classes must implement + * {@link #retrieve(DataRequestMonitor)} to fetch data from the data source. + * Sub-classes or clients are also responsible for calling {@link #disable()} + * and {@link #reset()} to manage the state of the cache in response to events + * from the data source. + * <p> + * This cache requires an executor to use. The executor is used to synchronize + * access to the cache state and data. + * </p> + * @since 2.2 + */ +@ConfinedToDsfExecutor("fExecutor") +public abstract class AbstractCache<V> implements ICache<V> { + + private static final IStatus INVALID_STATUS = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache invalid", null); //$NON-NLS-1$ + private static final IStatus DISABLED_STATUS = new Status(IStatus.ERROR, DsfPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache disabled", null); //$NON-NLS-1$ + + private class RequestCanceledListener implements RequestMonitor.ICanceledListener { + public void requestCanceled(final RequestMonitor canceledRm) { + fExecutor.execute(new Runnable() { + public void run() { + handleCanceledRm(canceledRm); + } + }); + } + }; + + private RequestCanceledListener fRequestCanceledListener = new RequestCanceledListener(); + + private boolean fValid; + + private V fData; + private IStatus fStatus = INVALID_STATUS; + + @ThreadSafe + private Object fWaitingList; + + private final ImmediateInDsfExecutor fExecutor; + + public AbstractCache(ImmediateInDsfExecutor executor) { + fExecutor = executor; + } + + public DsfExecutor getExecutor() { + return fExecutor.getDsfExecutor(); + } + + protected ImmediateInDsfExecutor getImmediateInDsfExecutor() { + return fExecutor; + } + + /** + * Sub-classes should override this method to retrieve the cache data + * from its source. + * + * @param rm Request monitor for completion of data retrieval. + */ + abstract protected void retrieve(); + + + /** + * Called while holding a lock to "this". No new request will start until + * this call returns. + */ + @ThreadSafe + abstract protected void canceled(); + + public boolean isValid() { + return fValid; + } + + public V getData() { + return fData; + } + + public IStatus getStatus() { + return fStatus; + } + + public void request(final DataRequestMonitor<V> rm) { + wait(rm); + } + + public void wait(RequestMonitor rm) { + assert fExecutor.getDsfExecutor().isInExecutorThread(); + + if (!fValid) { + boolean first = false; + synchronized (this) { + if (fWaitingList == null) { + first = true; + fWaitingList = rm; + } else if (fWaitingList instanceof RequestMonitor[]) { + RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList; + int waitingListLength = waitingList.length; + int i; + for (i = 0; i < waitingListLength; i++) { + if (waitingList[i] == null) { + waitingList[i] = rm; + break; + } + } + if (i == waitingListLength) { + RequestMonitor[] newWaitingList = new RequestMonitor[waitingListLength + 1]; + System.arraycopy(waitingList, 0, newWaitingList, 0, waitingListLength); + newWaitingList[waitingListLength] = rm; + fWaitingList = newWaitingList; + } + } else { + RequestMonitor[] newWaitingList = new RequestMonitor[2]; + newWaitingList[0] = (RequestMonitor)fWaitingList; + newWaitingList[1] = rm; + fWaitingList = newWaitingList; + } + } + rm.addCancelListener(fRequestCanceledListener); + if (first) { + retrieve(); + } + } else { + if (rm instanceof DataRequestMonitor<?>) { + @SuppressWarnings("unchecked") + DataRequestMonitor<V> drm = (DataRequestMonitor<V>)rm; + drm.setData(fData); + } + rm.setStatus(fStatus); + rm.done(); + } + } + + private void doSet(V data, IStatus status, boolean valid) { + assert fExecutor.getDsfExecutor().isInExecutorThread(); + + fData = data; + fStatus = status; + fValid = valid; + + Object waiting = null; + synchronized(this) { + waiting = fWaitingList; + fWaitingList = null; + } + if (waiting != null) { + if (waiting instanceof RequestMonitor) { + completeWaitingRm((RequestMonitor)waiting); + } else if (waiting instanceof RequestMonitor[]) { + RequestMonitor[] waitingList = (RequestMonitor[])waiting; + for (int i = 0; i < waitingList.length; i++) { + if (waitingList[i] != null) { + completeWaitingRm(waitingList[i]); + } + } + } + waiting = null; + } + } + + private void completeWaitingRm(RequestMonitor rm) { + if (rm instanceof DataRequestMonitor<?>) { + @SuppressWarnings("unchecked") + DataRequestMonitor<V> drm = (DataRequestMonitor<V>)rm; + drm.setData(fData); + } + rm.setStatus(fStatus); + rm.removeCancelListener(fRequestCanceledListener); + rm.done(); + } + + private void handleCanceledRm(final RequestMonitor rm) { + + boolean found = false; + boolean waiting = false; + synchronized (this) { + if (rm.equals(fWaitingList)) { + found = true; + waiting = false; + fWaitingList = null; + } else if(fWaitingList instanceof RequestMonitor[]) { + RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList; + for (int i = 0; i < waitingList.length; i++) { + if (!found && rm.equals(waitingList[i])) { + waitingList[i] = null; + found = true; + } + waiting = waiting || waitingList[i] != null; + } + } + if (/*found && */!waiting) { + canceled(); + } + } + + // If we have no clients waiting anymore, cancel the request + if (found) { + // We no longer need to listen to cancelations. + rm.removeCancelListener(fRequestCanceledListener); + rm.setStatus(Status.CANCEL_STATUS); + rm.done(); + } + + } + + @ThreadSafe + protected boolean isCanceled() { + boolean canceled; + List<RequestMonitor> canceledRms = null; + synchronized (this) { + if (fWaitingList instanceof RequestMonitor && ((RequestMonitor)fWaitingList).isCanceled()) { + canceledRms = new ArrayList<RequestMonitor>(1); + canceledRms.add((RequestMonitor)fWaitingList); + fWaitingList = null; + } else if(fWaitingList instanceof RequestMonitor[]) { + boolean waiting = false; + RequestMonitor[] waitingList = (RequestMonitor[])fWaitingList; + for (int i = 0; i < waitingList.length; i++) { + if (waitingList[i] != null && waitingList[i].isCanceled()) { + if (canceledRms == null) { + canceledRms = new ArrayList<RequestMonitor>(1); + } + canceledRms.add( waitingList[i] ); + waitingList[i] = null; + } + waiting = waiting || waitingList[i] != null; + } + if (!waiting) { + fWaitingList = null; + } + } + canceled = fWaitingList == null; + } + if (canceledRms != null) { + for (RequestMonitor canceledRm : canceledRms) { + canceledRm.setStatus(Status.CANCEL_STATUS); + canceledRm.removeCancelListener(fRequestCanceledListener); + canceledRm.done(); + } + } + + return canceled; + } + + /** + * Resets the cache with a data value <code>null</code> and an error + * status with code {@link IDsfStatusConstants#INVALID_STATE}. + * + * @see #reset(Object, IStatus) + */ + protected void reset() { + reset(null, INVALID_STATUS); + } + + /** + * Resets the cache with given data and status. Resetting the cache + * forces the cache to be invalid and cancels any current pending requests + * from data source. + * <p> + * This method should be called when the data source has issued an event + * indicating that the source data has changed but data may still be + * retrieved. Clients may need to re-request data following cache reset. + * </p> + * @param data The data that should be returned to any clients currently + * waiting for cache data. + * @status The status that should be returned to any clients currently + * waiting for cache data. + */ + protected void reset(V data, IStatus status) { + doSet(data, status, false); + } + + /** + * Disables the cache from retrieving data from the source. If the cache + * is already valid the data and status is retained. If the cache is not + * valid, then data value <code>null</code> and an error status with code + * {@link IDsfStatusConstants#INVALID_STATE} are set. + * + * @see #set(Object, IStatus) + */ + protected void disable() { + if (!fValid) { + set(null, DISABLED_STATUS); + } + } + + /** + * Resets the cache then disables it. When a cache is disabled it means + * that it is valid and requests to the data source will not be sent. + * <p> + * This method should be called when the data source has issued an event + * indicating that the source data has changed and future requests for + * data will return the given data and status. Once the source data + * becomes available again, clients should call {@link #reset()}. + * </p> + * @param data The data that should be returned to any clients waiting for + * cache data and for clients requesting data until the cache is reset again. + * @status The status that should be returned to any clients waiting for + * cache data and for clients requesting data until the cache is reset again. + * + * @see #reset(Object, IStatus) + */ + protected void set(V data, IStatus status) { + doSet(data, status, true); + } +} diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java new file mode 100644 index 00000000000..a92d45103ad --- /dev/null +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ICache.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * Copyright (c) 2010 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.cdt.dsf.concurrent; + +import org.eclipse.core.runtime.IStatus; + +/** + * The interface for a general purpose cache that caches the result of a single + * request. Implementations need to provide the logic to fetch data from an + * asynchronous data source. + * <p> + * This cache requires an executor to use. The executor is used to synchronize + * access to the cache state and data. + * </p> + * @since 2.2 + */ +@ConfinedToDsfExecutor("getExecutor()") +public interface ICache<V> { + + /** + * The executor that must be used to access this cache. + */ + public DsfExecutor getExecutor(); + + /** + * Returns the current data value held by this cache. Clients should first + * call isValid() to determine if the data is up to date. + */ + public V getData(); + + /** + * Returns the status of the source request held by this cache. Clients + * should first call isValid() to determine if the data is up to date. + */ + public IStatus getStatus(); + + /** + * Wait for the cache to become valid. If the cache is valid already, the + * request returns immediately, otherwise data will first be retrieved from the + * source. + * + * @param rm RequestMonitor that is called when cache becomes valid. + */ + public void wait(RequestMonitor rm); + + /** + * Request data from the cache. The cache is valid, it will complete the + * request immediately, otherwise data will first be retrieved from the + * source. + * + * @param rm RequestMonitor that is called when cache becomes valid. + */ + public void request(DataRequestMonitor<V> rm); + + /** + * Returns <code>true</code> if the cache is currently valid. I.e. + * whether the cache can return a value immediately without first + * retrieving it from the data source. + */ + public boolean isValid(); +} diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java new file mode 100644 index 00000000000..55706dc4f9d --- /dev/null +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/ImmediateInDsfExecutor.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright (c) 2008 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.cdt.dsf.concurrent; + +import java.util.concurrent.Executor; + + +/** + * @since 2.2 + * + */ +public class ImmediateInDsfExecutor implements Executor { + + final private DsfExecutor fDsfExecutor; + + public DsfExecutor getDsfExecutor() { + return fDsfExecutor; + } + + public ImmediateInDsfExecutor(DsfExecutor dsfExecutor) { + fDsfExecutor = dsfExecutor; + } + + public void execute(final Runnable command) { + if (fDsfExecutor.isInExecutorThread()) { + command.run(); + } else { + fDsfExecutor.execute(new DsfRunnable() { + public void run() { + command.run(); + } + }); + } + } + +} diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java index 634716b2e34..1c52b3099eb 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java @@ -24,7 +24,7 @@ import org.eclipse.core.runtime.Status; * This cache requires an executor to use. The executor is used to synchronize * access to the cache state and data. * </p> - * @since 2.1 + * @since 2.2 */ @ConfinedToDsfExecutor("fExecutor") public abstract class RequestCache<V> extends AbstractCache<V> { @@ -36,21 +36,15 @@ public abstract class RequestCache<V> extends AbstractCache<V> { super(executor); } - /** - * Sub-classes should override this method to retrieve the cache data - * from its source. - * - * @param rm Request monitor for completion of data retrieval. - */ @Override - protected void retrieve() { + protected final void retrieve() { // Make sure to cancel the previous rm. This may lead to the rm being // canceled twice, but that's not harmful. if (fRm != null) { fRm.cancel(); } - fRm = new DataRequestMonitor<V>(getExecutor(), null) { + fRm = new DataRequestMonitor<V>(getImmediateInDsfExecutor(), null) { private IStatus fRawStatus = Status.OK_STATUS; @@ -95,7 +89,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> { } @Override - public void reset(V data, IStatus status) { + protected void reset(V data, IStatus status) { if (fRm != null) { fRm.cancel(); fRm = null; @@ -104,7 +98,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> { } @Override - public void disable() { + protected void disable() { if (fRm != null) { fRm.cancel(); fRm = null; @@ -113,7 +107,7 @@ public abstract class RequestCache<V> extends AbstractCache<V> { } @Override - public void set(V data, IStatus status) { + protected void set(V data, IStatus status) { if (fRm != null) { fRm.cancel(); fRm = null; diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java index 8de8d5ae07f..58ed79cbfe8 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java @@ -122,7 +122,7 @@ public abstract class Transaction<V> { * @throws CoreException * if an error was encountered getting the data from the source */ - protected void validate(RequestCache<?> cache) throws InvalidCacheException, CoreException { + protected void validate(ICache<?> cache) throws InvalidCacheException, CoreException { if (cache.isValid()) { if (!cache.getStatus().isOK()) { throw new CoreException(cache.getStatus()); @@ -141,15 +141,22 @@ public abstract class Transaction<V> { } } + /** + * See {@link #validate(RequestCache)}. This variant simply validates + * multiple cache objects. + */ + protected void validate(RequestCache<?> ... caches) throws InvalidCacheException, CoreException { + } + /** * See {@link #validate(RequestCache)}. This variant simply validates * multiple cache objects. */ - protected void validate(RequestCache<?> ... caches) throws InvalidCacheException, CoreException { + protected void validate(Iterable<ICache<?>> caches) throws InvalidCacheException, CoreException { // Check if any of the caches have errors: boolean allValid = true; - for (RequestCache<?> cache : caches) { + for (ICache<?> cache : caches) { if (cache.isValid()) { if (!cache.getStatus().isOK()) { throw new CoreException(cache.getStatus()); @@ -169,7 +176,7 @@ public abstract class Transaction<V> { } }; int count = 0; - for (RequestCache<?> cache : caches) { + for (ICache<?> cache : caches) { if (!cache.isValid()) { cache.wait(countringRm); count++; diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java new file mode 100644 index 00000000000..b6455fd62dd --- /dev/null +++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java @@ -0,0 +1,721 @@ +/******************************************************************************* + * Copyright (c) 2006 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.cdt.tests.dsf.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; + +import junit.framework.Assert; + +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DsfRunnable; +import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants; +import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor; +import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.Query; +import org.eclipse.cdt.dsf.concurrent.RequestCache; +import org.eclipse.cdt.tests.dsf.TestDsfExecutor; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests that exercise the DataCache object. + */ +public class CacheTests { + + TestDsfExecutor fExecutor; + TestCache fTestCache; + DataRequestMonitor<Integer> fRetrieveRm; + + class TestCache extends RequestCache<Integer> { + + public TestCache() { + super(new ImmediateInDsfExecutor(fExecutor)); + } + + @Override + protected void retrieve(DataRequestMonitor<Integer> rm) { + synchronized(CacheTests.this) { + fRetrieveRm = rm; + CacheTests.this.notifyAll(); + } + } + + @Override + protected void reset() { + super.reset(); + } + + @Override + public void reset(Integer data, IStatus status) { + super.reset(data, status); + } + + @Override + public void disable() { + super.disable(); + } + + @Override + public void set(Integer data, IStatus status) { + super.set(data, status); + } + + } + + /** + * There's no rule on how quickly the cache has to start data retrieval + * after it has been requested. It could do it immediately, or it could + * wait a dispatch cycle, etc.. + */ + private void waitForRetrieveRm() { + synchronized(this) { + while (fRetrieveRm == null) { + try { + wait(); + } catch (InterruptedException e) { + return; + } + } + } + } + + @Before + public void startExecutor() throws ExecutionException, InterruptedException { + fExecutor = new TestDsfExecutor(); + fTestCache = new TestCache(); + } + + @After + public void shutdownExecutor() throws ExecutionException, InterruptedException { + fExecutor.submit(new DsfRunnable() { public void run() { + fExecutor.shutdown(); + }}).get(); + if (fExecutor.exceptionsCaught()) { + Throwable[] exceptions = fExecutor.getExceptions(); + throw new ExecutionException(exceptions[0]); + } + fRetrieveRm = null; + fTestCache = null; + fExecutor = null; + } + + private void assertCacheValidWithData(Object data) { + Assert.assertTrue(fTestCache.isValid()); + Assert.assertEquals(data, fTestCache.getData()); + Assert.assertTrue(fTestCache.getStatus().isOK()); + } + + private void assertCacheResetWithoutData() { + Assert.assertFalse(fTestCache.isValid()); + Assert.assertEquals(null, fTestCache.getData()); + Assert.assertFalse(fTestCache.getStatus().isOK()); + Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + } + + private void assertCacheDisabledWithoutData() { + Assert.assertTrue(fTestCache.isValid()); + Assert.assertEquals(null, fTestCache.getData()); + Assert.assertFalse(fTestCache.getStatus().isOK()); + Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + } + + private void assertCacheWaiting() { + Assert.assertFalse(fTestCache.isValid()); + Assert.assertEquals(null, fTestCache.getData()); + Assert.assertFalse(fTestCache.getStatus().isOK()); + Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + Assert.assertFalse(fRetrieveRm.isCanceled()); + } + + private void assertCacheCanceled() { + Assert.assertFalse(fTestCache.isValid()); + Assert.assertEquals(null, fTestCache.getData()); + Assert.assertFalse(fTestCache.getStatus().isOK()); + Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + Assert.assertTrue(fRetrieveRm.isCanceled()); + } + + @Test + public void getWithCompletionInDsfThreadTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + // Check initial state + Assert.assertFalse(fTestCache.isValid()); + Assert.assertFalse(fTestCache.getStatus().isOK()); + Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + + fExecutor.execute(q); + + // Wait until the cache requests the data. + waitForRetrieveRm(); + + // Check state while waiting for data + Assert.assertFalse(fTestCache.isValid()); + + // Complete the cache's retrieve data request. + fExecutor.submit(new Callable<Object>() { public Object call() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + // Check that the data is available in the cache immediately + // (in the same dispatch cycle). + Assert.assertEquals(1, (int)fTestCache.getData()); + Assert.assertTrue(fTestCache.isValid()); + + return null; + }}).get(); + + Assert.assertEquals(1, (int)q.get()); + + // Re-check final state + assertCacheValidWithData(1); + } + + @Test + public void getTest() throws InterruptedException, ExecutionException { + // Check initial state + Assert.assertFalse(fTestCache.isValid()); + + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Check state while waiting for data + Assert.assertFalse(fTestCache.isValid()); + + // Set the data without using an executor. + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + Assert.assertEquals(1, (int)q.get()); + + // Check final state + assertCacheValidWithData(1); + } + + @Test + public void getTestWithTwoClients() throws InterruptedException, ExecutionException { + // Check initial state + Assert.assertFalse(fTestCache.isValid()); + + // Request data from cache + Query<Integer> q1 = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q1); + + // Request data from cache again + Query<Integer> q2 = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q2); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Check state while waiting for data + Assert.assertFalse(fTestCache.isValid()); + + // Set the data without using an executor. + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + Assert.assertEquals(1, (int)q1.get()); + Assert.assertEquals(1, (int)q2.get()); + + // Check final state + assertCacheValidWithData(1); + } + + @Test + public void getTestWithManyClients() throws InterruptedException, ExecutionException { + // Check initial state + Assert.assertFalse(fTestCache.isValid()); + + // Request data from cache + List<Query<Integer>> qList = new ArrayList<Query<Integer>>(); + for (int i = 0; i < 10; i++) { + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + qList.add(q); + } + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Check state while waiting for data + Assert.assertFalse(fTestCache.isValid()); + + // Set the data without using an executor. + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + for (Query<Integer> q : qList) { + Assert.assertEquals(1, (int)q.get()); + } + + // Check final state + assertCacheValidWithData(1); + } + + @Test + public void disableBeforeRequestTest() throws InterruptedException, ExecutionException { + // Disable the cache with a given value + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.disable(); + } + }).get(); + + assertCacheDisabledWithoutData(); + + // Try to request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + Thread.sleep(100); + + // Retrieval should never have been made. + Assert.assertEquals(null, fRetrieveRm); + + try { + Assert.assertEquals(null, q.get()); + } catch (ExecutionException e) { + // expected the exception + return; + } + Assert.fail("expected an exeption"); + } + + @Test + public void disableWhilePendingTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Disable the cache with a given value + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.disable(); + } + }).get(); + + assertCacheDisabledWithoutData(); + + // Completed the retrieve RM + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + } + }).get(); + + // Validate that cache is still disabled without data. + assertCacheDisabledWithoutData(); + } + + @Test + public void disableWhileValidTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Complete the request + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + q.get(); + + // Disable cache + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.disable(); + } + }).get(); + + // Check final state + assertCacheValidWithData(1); + } + + @Test + public void disableWithValueTest() throws InterruptedException, ExecutionException { + // Disable the cache with a given value + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.set(2, Status.OK_STATUS); + } + }).get(); + + // Validate that cache is disabled without data. + assertCacheValidWithData(2); + } + + + @Test + public void resetBeforeRequestTest() throws InterruptedException, ExecutionException { + // Disable the cache with a given value + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(); + } + }).get(); + + assertCacheResetWithoutData(); + + // Try to request data from cache (check that cache still works normally) + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Complete the request + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + // Check result + Assert.assertEquals(1, (int)q.get()); + + assertCacheValidWithData(1); + } + + @Test + public void resetWhilePendingTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Disable the cache with a given value + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(); + } + }).get(); + + assertCacheResetWithoutData(); + + // Completed the retrieve RM + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + } + }).get(); + + // Validate that cache is still disabled without data. + assertCacheResetWithoutData(); + } + + @Test + public void cancelWhilePendingTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Cancel the client request + q.cancel(true); + try { + q.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; + + assertCacheCanceled(); + + // Completed the retrieve RM + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + } + }).get(); + + // Validate that cache accepts the canceled request data + assertCacheValidWithData(1); + } + + @Test + public void cancelWhilePendingWithoutClientNotificationTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(new DataRequestMonitor<Integer>(ImmediateExecutor.getInstance(), rm) { + @Override + public synchronized void addCancelListener(ICanceledListener listener) { + // Do not add the cancel listener so that the cancel request is not + // propagated to the cache. + } + }); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Cancel the client request + q.cancel(true); + + assertCacheCanceled(); + + try { + q.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; + + // Completed the retrieve RM + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + } + }).get(); + + // Validate that cache accepts the canceled request data + assertCacheValidWithData(1); + } + + @Test + public void cancelWhilePendingWithTwoClientsTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q1 = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q1); + + // Request data from cache again + Query<Integer> q2 = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q2); + + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Cancel the first client request + q1.cancel(true); + try { + q1.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; + assertCacheWaiting(); + + // Cancel the second request + q2.cancel(true); + try { + q2.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; + + assertCacheCanceled(); + + // Completed the retrieve RM + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + } + }).get(); + + // Validate that cache accepts the canceled request data + assertCacheValidWithData(1); + } + + @Test + public void cancelWhilePendingWithManyClientsTest() throws InterruptedException, ExecutionException { + // Request data from cache + List<Query<Integer>> qList = new ArrayList<Query<Integer>>(); + for (int i = 0; i < 10; i++) { + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + qList.add(q); + } + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Cancel some client requests + int[] toCancel = new int[] { 0, 2, 5, 9}; + for (int i = 0; i < toCancel.length; i++) { + + // Cancel request and verify that its canceled + Query<Integer> q = qList.get(toCancel[i]); + q.cancel(true); + try { + q.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; + qList.set(toCancel[i], null); + + assertCacheWaiting(); + } + + // Replace canceled requests with new ones + for (int i = 0; i < toCancel.length; i++) { + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + qList.set(toCancel[i], q); + assertCacheWaiting(); + } + + // Now cancel all requests + for (int i = 0; i < (qList.size() - 1); i++) { + // Validate that cache is still waiting and is not canceled + assertCacheWaiting(); + qList.get(i).cancel(true); + } + qList.get(qList.size() - 1).cancel(true); + assertCacheCanceled(); + + // Completed the retrieve RM + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm.setData(1); + fRetrieveRm.done(); + } + }).get(); + + // Validate that cache accepts the canceled request data + assertCacheValidWithData(1); + } + + @Test + public void resetWhileValidTest() throws InterruptedException, ExecutionException { + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + // Complete the request + fRetrieveRm.setData(1); + fRetrieveRm.done(); + + q.get(); + + // Disable cache + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(); + } + }).get(); + + // Check final state + assertCacheResetWithoutData(); + } + + @Test + public void resetWithValueTest() throws InterruptedException, ExecutionException { + // Disable the cache with a given value + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(2, Status.OK_STATUS); + } + }).get(); + + // Validate that cache is disabled without data. + Assert.assertFalse(fTestCache.isValid()); + Assert.assertEquals(2, (int)fTestCache.getData()); + Assert.assertTrue(fTestCache.getStatus().isOK()); + } +} diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java new file mode 100644 index 00000000000..1b53c51bc30 --- /dev/null +++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/TransactionTests.java @@ -0,0 +1,178 @@ +/******************************************************************************* + * Copyright (c) 2006 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.cdt.tests.dsf.concurrent; + +import java.util.Arrays; +import java.util.concurrent.ExecutionException; + +import junit.framework.Assert; + +import org.eclipse.cdt.dsf.concurrent.RequestCache; +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DsfRunnable; +import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.Query; +import org.eclipse.cdt.dsf.concurrent.Transaction; +import org.eclipse.cdt.tests.dsf.TestDsfExecutor; +import org.eclipse.core.runtime.CoreException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests that exercise the Transaction object. + */ +public class TransactionTests { + final static private int NUM_CACHES = 5; + + TestDsfExecutor fExecutor; + TestCache[] fTestCaches = new TestCache[NUM_CACHES]; + DataRequestMonitor<?>[] fRetrieveRms = new DataRequestMonitor<?>[NUM_CACHES]; + + class TestCache extends RequestCache<Integer> { + + final private int fIndex; + + public TestCache(int index) { + super(new ImmediateInDsfExecutor(fExecutor)); + fIndex = index; + } + + @Override + protected void retrieve(DataRequestMonitor<Integer> rm) { + synchronized(TransactionTests.this) { + fRetrieveRms[fIndex] = rm; + TransactionTests.this.notifyAll(); + } + } + + } + + class TestSingleTransaction extends Transaction<Integer> { + + @Override + protected Integer process() throws InvalidCacheException, CoreException { + validate(fTestCaches[0]); + return fTestCaches[0].getData(); + } + } + + class TestSumTransaction extends Transaction<Integer> { + @Override + protected Integer process() throws InvalidCacheException, CoreException { + validate(fTestCaches); + + int sum = 0; + for (RequestCache<Integer> cache : fTestCaches) { + sum += cache.getData(); + } + return sum; + } + } + + /** + * There's no rule on how quickly the cache has to start data retrieval + * after it has been requested. It could do it immediately, or it could + * wait a dispatch cycle, etc.. + */ + private void waitForRetrieveRm(boolean all) { + synchronized(this) { + if (all) { + while (Arrays.asList(fRetrieveRms).contains(null)) { + try { + wait(); + } catch (InterruptedException e) { + return; + } + } + } else { + while (fRetrieveRms[0] == null) { + try { + wait(); + } catch (InterruptedException e) { + return; + } + } + } + } + } + + @Before + public void startExecutor() throws ExecutionException, InterruptedException { + fExecutor = new TestDsfExecutor(); + for (int i = 0; i < fTestCaches.length; i++) { + fTestCaches[i] = new TestCache(i); + } + } + + @After + public void shutdownExecutor() throws ExecutionException, InterruptedException { + fExecutor.submit(new DsfRunnable() { public void run() { + fExecutor.shutdown(); + }}).get(); + if (fExecutor.exceptionsCaught()) { + Throwable[] exceptions = fExecutor.getExceptions(); + throw new ExecutionException(exceptions[0]); + } + fRetrieveRms = new DataRequestMonitor<?>[NUM_CACHES]; + fTestCaches = new TestCache[NUM_CACHES]; + fExecutor = null; + } + + @Test + public void singleTransactionTest() throws InterruptedException, ExecutionException { + final TestSingleTransaction testTransaction = new TestSingleTransaction(); + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + testTransaction.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(false); + + // Set the data without using an executor. + ((DataRequestMonitor<Integer>)fRetrieveRms[0]).setData(1); + fRetrieveRms[0].done(); + + Assert.assertEquals(1, (int)q.get()); + } + + @Test + public void sumTransactionTest() throws InterruptedException, ExecutionException { + + final TestSumTransaction testTransaction = new TestSumTransaction(); + // Request data from cache + Query<Integer> q = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + testTransaction.request(rm); + } + }; + fExecutor.execute(q); + + // Wait until the cache starts data retrieval. + waitForRetrieveRm(true); + + // Set the data without using an executor. + for (DataRequestMonitor<?> rm : fRetrieveRms) { + ((DataRequestMonitor<Integer>)rm).setData(1); + rm.done(); + } + + fExecutor.execute(q); + Assert.assertEquals(NUM_CACHES, (int)q.get()); + } + +} |