diff options
author | Pawel Piech | 2010-10-19 21:20:56 +0000 |
---|---|---|
committer | Pawel Piech | 2010-10-19 21:20:56 +0000 |
commit | a56f1006bda417474544e566a577a1778fba41a4 (patch) | |
tree | 6418c5b63ef7be010d4bd2b6f5d36d0572165a13 /dsf | |
parent | 432a6010a8551166483e3975dc9d70950ea4cc45 (diff) | |
download | org.eclipse.cdt-a56f1006bda417474544e566a577a1778fba41a4.tar.gz org.eclipse.cdt-a56f1006bda417474544e566a577a1778fba41a4.tar.xz org.eclipse.cdt-a56f1006bda417474544e566a577a1778fba41a4.zip |
Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM) utilities for DSF
Diffstat (limited to 'dsf')
6 files changed, 975 insertions, 188 deletions
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java index f6da42170a8..c3827309c68 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java @@ -28,7 +28,6 @@ import org.eclipse.core.runtime.Status; * This cache requires an executor to use. The executor is used to synchronize * access to the cache state and data. * </p> - * * @since 2.2 */ @ConfinedToDsfExecutor("fExecutor") @@ -69,7 +68,7 @@ public abstract class AbstractCache<V> implements ICache<V> { protected ImmediateInDsfExecutor getImmediateInDsfExecutor() { return fExecutor; } - + /** * Sub-classes should override this method to retrieve the cache data from * its source. The implementation should call {@link #set(Object, IStatus)} @@ -83,8 +82,15 @@ public abstract class AbstractCache<V> implements ICache<V> { /** - * Called while holding a lock to "this". No new request will start until - * this call returns. + * Called to cancel a retrieve request. This method is called when + * clients of the cache no longer need data that was requested. <br> + * Sub-classes should cancel and clean up requests to the asynchronous + * data source. + * + * <p> + * Note: Called while holding a lock to "this". No new request will start until + * this call returns. + * </p> */ @ThreadSafe abstract protected void canceled(); @@ -94,10 +100,16 @@ public abstract class AbstractCache<V> implements ICache<V> { } public V getData() { + if (!fValid) { + throw new IllegalStateException("Cache is not valid. Cache data can be read only when cache is valid."); //$NON-NLS-1$ + } return fData; } public IStatus getStatus() { + if (!fValid) { + throw new IllegalStateException("Cache is not valid. Cache status can be read only when cache is valid."); //$NON-NLS-1$ + } return fStatus; } @@ -143,33 +155,6 @@ public abstract class AbstractCache<V> implements ICache<V> { } } - private void doSet(V data, IStatus status, boolean valid) { - assert fExecutor.getDsfExecutor().isInExecutorThread(); - - fData = data; - fStatus = status; - fValid = valid; - - Object waiting = null; - synchronized(this) { - waiting = fWaitingList; - fWaitingList = null; - } - if (waiting != null) { - if (waiting instanceof RequestMonitor) { - completeWaitingRm((RequestMonitor)waiting); - } else if (waiting instanceof RequestMonitor[]) { - RequestMonitor[] waitingList = (RequestMonitor[])waiting; - for (int i = 0; i < waitingList.length; i++) { - if (waitingList[i] != null) { - completeWaitingRm(waitingList[i]); - } - } - } - waiting = null; - } - } - private void completeWaitingRm(RequestMonitor rm) { if (rm instanceof DataRequestMonitor<?>) { @SuppressWarnings("unchecked") @@ -200,14 +185,14 @@ public abstract class AbstractCache<V> implements ICache<V> { waiting = waiting || waitingList[i] != null; } } - if (/*found && */!waiting) { + if (found && !waiting) { canceled(); } } // If we have no clients waiting anymore, cancel the request if (found) { - // We no longer need to listen to cancelations. + // We no longer need to listen to cancellations. rm.removeCancelListener(fRequestCanceledListener); rm.setStatus(Status.CANCEL_STATUS); rm.done(); @@ -253,60 +238,60 @@ public abstract class AbstractCache<V> implements ICache<V> { return canceled; } - - /** - * Resets the cache, setting its data to <code>null</code>, and status to - * {@link #INVALID_STATUS}. Equivalent to reset(null, INVALID_STATUS) - * - * @see #reset(Object, IStatus) - */ + + /** + * Resets the cache with a data value <code>null</code> and an error + * status with code {@link IDsfStatusConstants#INVALID_STATE}. + * + * @see #reset(Object, IStatus) + */ protected void reset() { - reset(null, INVALID_STATUS); - } - - /** - * Resets the cache, setting its data to [data], and status to [status]. - * Resetting the cache puts it in the invalid state and cancels any current - * pending requests to the data source. - * - * <p> - * The cache should be reset when the data source has issued an event - * indicating that the source data has changed but data may still be - * retrieved. Clients may need to re-request data following a cache reset. - * - * @param data - * The data that should be returned to any client that calls - * {@link #getData()} despite the invalid state - * @status The status that should be returned to any client that calls - * {@link #getStatus()()} despite the invalid state - * @see #reset() - * @see #set(Object, IStatus) - */ - protected void reset(V data, IStatus status) { - doSet(data, status, false); + if (!fValid) { + throw new IllegalStateException("Cache is not valid. Cache can be reset only when it's in a valid state"); //$NON-NLS-1$ + } + fValid = false; } - /** - * Puts the cache into the valid state, given it new data and status. - * - * This method should be called when the subclass has received a response - * for updated data from the source. Note that such a response may be an - * error. That does not make the cache invalid. Invalid strictly means that - * the cache's data has either gone stale or that it's in the initial unset - * state. - * - * @param data - * The data that should be returned to any clients waiting for - * cache data and for clients requesting data, until the cache is - * invalidated via one of the reset methods. - * @status The status that should be returned to any clients waiting for - * cache data and for clients requesting status, until the cache is - * invalidated via one of the reset methods. - * - * @see #reset() - * @see #reset(Object, IStatus) - */ + /** + * Resets the cache then disables it. When a cache is disabled it means + * that it is valid and requests to the data source will not be sent. + * <p> + * This method should be called when the data source has issued an event + * indicating that the source data has changed and future requests for + * data will return the given data and status. Once the source data + * becomes available again, clients should call {@link #reset()}. + * </p> + * @param data The data that should be returned to any clients waiting for + * cache data and for clients requesting data until the cache is reset again. + * @status The status that should be returned to any clients waiting for + * cache data and for clients requesting data until the cache is reset again. + * + * @see #reset(Object, IStatus) + */ protected void set(V data, IStatus status) { - doSet(data, status, true); + assert fExecutor.getDsfExecutor().isInExecutorThread(); + + fData = data; + fStatus = status; + fValid = true; + + Object waiting = null; + synchronized(this) { + waiting = fWaitingList; + fWaitingList = null; + } + if (waiting != null) { + if (waiting instanceof RequestMonitor) { + completeWaitingRm((RequestMonitor)waiting); + } else if (waiting instanceof RequestMonitor[]) { + RequestMonitor[] waitingList = (RequestMonitor[])waiting; + for (int i = 0; i < waitingList.length; i++) { + if (waitingList[i] != null) { + completeWaitingRm(waitingList[i]); + } + } + } + waiting = null; + } } } diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java new file mode 100644 index 00000000000..f65e1bef08f --- /dev/null +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java @@ -0,0 +1,286 @@ +/******************************************************************************* + * Copyright (c) 2010 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.cdt.dsf.concurrent; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.eclipse.core.runtime.CoreException; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; + +/** + * Cache for retrieving ranges of elements from an asynchronous data source. + * Clients of this cache should call {@link #getRange(long, int)} to get a cache + * for that given range of elements. Sub-classes must implement {@link #retrieve(long, int, DataRequestMonitor)} + * to retrieve data from the asynchronous data source. + * @since 2.2 + */ +abstract public class RangeCache<V> { + + private class Request extends RequestCache<List<V>> implements Comparable<Request> { + long fOffset; + int fCount; + @Override + protected void retrieve(DataRequestMonitor<java.util.List<V>> rm) { + RangeCache.this.retrieve(fOffset, fCount, rm); + } + + Request(long offset, int count) { + super(fExecutor); + fOffset = offset; + fCount = count; + } + + public int compareTo(RangeCache<V>.Request o) { + if (fOffset > o.fOffset) { + return 1; + } else if (fOffset == o.fOffset) { + return 0; + } else /*if (fOffset < o.fOffset)*/ { + return -1; + } + } + + @Override + public boolean equals(Object _o) { + if (_o instanceof RangeCache<?>.Request) { + RangeCache<?>.Request o = (RangeCache<?>.Request)_o; + return fOffset == o.fOffset && fCount == o.fCount; + } + return false; + } + + @Override + public int hashCode() { + return (int)fOffset^fCount; + } + + @Override + public String toString() { + return "" + fOffset + "(" + fCount + ") -> " + (fOffset + fCount); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ + } + } + + /** + * This transaction class implements the main logic of the range cache. + * It examines the current requests held by the cache and and creates + * requests ones as needed. Once the requests are all valid it returns + * the completed data to the client. + */ + private class RangeTransaction extends Transaction<List<V>> { + + long fOffset; + int fCount; + + RangeTransaction(long offset, int count) { + fOffset = offset; + fCount = count; + } + + @Override + protected List<V> process() throws InvalidCacheException, CoreException { + clearCanceledRequests(); + + List<ICache<?>> transactionRequests = new ArrayList<ICache<?>>(1); + + // Create a new request for the data to retrieve. + Request current = new Request(fOffset, fCount); + + current = adjustRequestHead(current, transactionRequests); + if (current != null) { + current = adjustRequestTail(current, transactionRequests); + } + if (current != null) { + transactionRequests.add(current); + fRequests.add(current); + } + + validate(transactionRequests); + + return makeElementsListFromRequests(transactionRequests); + } + + + // Adjust the beginning of the requested range of data. If there + // is already an overlapping range in front of the requested range, + // then use it. + private Request adjustRequestHead(Request request, List<ICache<?>> transactionRequests) { + SortedSet<Request> headRequests = fRequests.headSet(request); + if (!headRequests.isEmpty()) { + Request headRequest = headRequests.last(); + long headEndOffset = headRequest.fOffset + headRequest.fCount; + if (headEndOffset > fOffset) { + transactionRequests.add(headRequest); + request.fCount = (int)(request.fCount - (headEndOffset - fOffset)); + request.fOffset = headEndOffset; + } + } + if (request.fCount > 0) { + return request; + } else { + return null; + } + } + + /** + * Adjust the end of the requested range of data. + * @param current + * @param transactionRequests + * @return + */ + private Request adjustRequestTail(Request current, List<ICache<?>> transactionRequests) { + // Create a duplicate of the tailSet, in order to avoid a concurrent modification exception. + List<Request> tailSet = new ArrayList<Request>(fRequests.tailSet(current)); + + // Iterate through the matching requests and add them to the requests list. + for (Request tailRequest : tailSet) { + if (tailRequest.fOffset < current.fOffset + fCount) { + // found overlapping request add it to list + if (tailRequest.fOffset <= current.fOffset) { + // next request starts off at the beginning of current request + transactionRequests.add(tailRequest); + current.fOffset = tailRequest.fOffset + tailRequest.fCount; + current.fCount = ((int)(fOffset - current.fOffset)) + fCount ; + if (current.fCount <= 0) { + return null; + } + } else { + current.fCount = (int)(tailRequest.fOffset - current.fOffset); + transactionRequests.add(current); + fRequests.add(current); + current = null; + transactionRequests.add(tailRequest); + long tailEndOffset = tailRequest.fOffset + tailRequest.fCount; + long rangeEndOffset = fOffset + fCount; + if (tailEndOffset >= rangeEndOffset) { + return null; + } else { + current = new Request(tailEndOffset, (int)(rangeEndOffset - tailEndOffset)); + } + } + } else { + break; + } + } + return current; + } + + private List<V> makeElementsListFromRequests(List<ICache<?>> requests) { + List<V> retVal = new ArrayList<V>(fCount); + long index = fOffset; + long end = fOffset + fCount; + int requestIdx = 0; + while (index < end ) { + @SuppressWarnings("unchecked") + Request request = (Request)requests.get(requestIdx); + if (index < request.fOffset + request.fCount) { + retVal.add( request.getData().get((int)(index - request.fOffset)) ); + index ++; + } else { + requestIdx++; + } + } + return retVal; + } + + private void clearCanceledRequests() { + for (Iterator<Request> itr = fRequests.iterator(); itr.hasNext();) { + Request request = itr.next(); + if (!request.isValid() && request.isCanceled()) { + itr.remove(); + } + } + } + } + + private final ImmediateInDsfExecutor fExecutor; + + /** + * Requests currently held by this cache. The requests should be for + * non-overlapping ranges of elements. + */ + + private SortedSet<Request> fRequests = new TreeSet<Request>(); + + public RangeCache(ImmediateInDsfExecutor executor) { + fExecutor = executor; + } + + /** + * Retrieves data from the data source. + * + * @param offset Offset in data range where the requested list of data should start. + * @param count Number of elements requests. + * @param rm Callback for the data. + */ + protected abstract void retrieve(long offset, int count, DataRequestMonitor<List<V>> rm); + + /** + * Returns a cache for the range of requested data. + * + * @param offset Offset in data range where the requested list of data should start. + * @param count Number of elements requests. + * @return Cache object for the requested data. + */ + public ICache<List<V>> getRange(final long offset, final int count) { + assert fExecutor.getDsfExecutor().isInExecutorThread(); + + return new RequestCache<List<V>>(fExecutor) { + @Override + protected void retrieve(DataRequestMonitor<List<V>> rm) { + new RangeTransaction(offset, count).request(rm); + } + }; + } + + /** + * Sets the given list and status to the cache. Subsequent range requests + * that fall in its the range will return the given data. Requests outside + * of its range will trigger a call to {@link #retrieve(long, int, DataRequestMonitor)}.<br> + * The given data parameter can be <code>null</code> if the given status + * parameter contains an error. In this case all requests in the given + * range will return the error. + * + * @param offset Offset of the given data to set to cache. + * @param count Count of the given data to set to cache. + * @param data List of elements to set to cache. Can be <code>null</code>. + * @param status Status object to set to cache. + */ + protected void set(long offset, int count, List<V> data, IStatus status) { + for (Request request : fRequests) { + if (!request.isValid()) { + request.set(null, Status.OK_STATUS); + } + } + fRequests.clear(); + Request request = new Request(offset, count); + request.set(data, status); + fRequests.add(request); + } + + /** + * Forces the cache into an invalid state. If there are any pending + * requests, their will continue and their results will be cached. + */ + protected void reset() { + for (Iterator<Request> itr = fRequests.iterator(); itr.hasNext();) { + Request request = itr.next(); + if (request.isValid()) { + request.reset(); + itr.remove(); + } + } + } +} diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java index 8b8d157eaca..dbf8f669b31 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java @@ -89,15 +89,6 @@ public abstract class RequestCache<V> extends AbstractCache<V> { } @Override - protected void reset(V data, IStatus status) { - if (fRm != null) { - fRm.cancel(); - fRm = null; - } - super.reset(data, status); - } - - @Override protected void set(V data, IStatus status) { if (fRm != null) { fRm.cancel(); diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java index b6c78264a1e..01bc5d9d6c0 100644 --- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java +++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java @@ -10,6 +10,8 @@ *******************************************************************************/ package org.eclipse.cdt.dsf.concurrent; +import java.util.Arrays; + import org.eclipse.core.runtime.CoreException; /** @@ -147,7 +149,7 @@ public abstract class Transaction<V> { * multiple cache objects. */ protected void validate(ICache<?> ... caches) throws InvalidCacheException, CoreException { - validate(caches); + validate(Arrays.asList(caches)); } /** diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java index 6923e5b1e3e..939b3912b6a 100644 --- a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java +++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/CacheTests.java @@ -61,11 +61,6 @@ public class CacheTests { } @Override - public void reset(Integer data, IStatus status) { - super.reset(data, status); - } - - @Override public void set(Integer data, IStatus status) { super.set(data, status); } @@ -117,6 +112,18 @@ public class CacheTests { private void assertCacheResetWithoutData() { Assert.assertFalse(fTestCache.isValid()); + try { + fTestCache.getData(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} + try { + fTestCache.getStatus(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} + } + + private void assertCacheDisabledWithoutData() { + Assert.assertTrue(fTestCache.isValid()); Assert.assertEquals(null, fTestCache.getData()); Assert.assertFalse(fTestCache.getStatus().isOK()); Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); @@ -124,17 +131,27 @@ public class CacheTests { private void assertCacheWaiting() { Assert.assertFalse(fTestCache.isValid()); - Assert.assertEquals(null, fTestCache.getData()); - Assert.assertFalse(fTestCache.getStatus().isOK()); - Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + try { + fTestCache.getData(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} + try { + fTestCache.getStatus(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} Assert.assertFalse(fRetrieveRm.isCanceled()); } private void assertCacheCanceled() { Assert.assertFalse(fTestCache.isValid()); - Assert.assertEquals(null, fTestCache.getData()); - Assert.assertFalse(fTestCache.getStatus().isOK()); - Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); + try { + fTestCache.getData(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} + try { + fTestCache.getStatus(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} Assert.assertTrue(fRetrieveRm.isCanceled()); } @@ -149,8 +166,6 @@ public class CacheTests { }; // Check initial state Assert.assertFalse(fTestCache.isValid()); - Assert.assertFalse(fTestCache.getStatus().isOK()); - Assert.assertEquals(fTestCache.getStatus().getCode(), IDsfStatusConstants.INVALID_STATE); fExecutor.execute(q); @@ -284,6 +299,103 @@ public class CacheTests { assertCacheValidWithData(1); } +// @Test +// public void disableBeforeRequestTest() throws InterruptedException, ExecutionException { +// // Disable the cache with a given value +// fExecutor.submit(new DsfRunnable() { +// public void run() { +// fTestCache.disable(); +// } +// }).get(); +// +// assertCacheDisabledWithoutData(); +// +// // Try to request data from cache +// Query<Integer> q = new Query<Integer>() { +// @Override +// protected void execute(DataRequestMonitor<Integer> rm) { +// fTestCache.request(rm); +// } +// }; +// fExecutor.execute(q); +// +// Thread.sleep(100); +// +// // Retrieval should never have been made. +// Assert.assertEquals(null, fRetrieveRm); +// +// try { +// Assert.assertEquals(null, q.get()); +// } catch (ExecutionException e) { +// // expected the exception +// return; +// } +// Assert.fail("expected an exeption"); +// } +// +// @Test +// public void disableWhilePendingTest() throws InterruptedException, ExecutionException { +// // Request data from cache +// Query<Integer> q = new Query<Integer>() { +// @Override +// protected void execute(DataRequestMonitor<Integer> rm) { +// fTestCache.request(rm); +// } +// }; +// fExecutor.execute(q); +// +// // Disable the cache with a given value +// fExecutor.submit(new DsfRunnable() { +// public void run() { +// fTestCache.disable(); +// } +// }).get(); +// +// assertCacheDisabledWithoutData(); +// +// // Completed the retrieve RM +// fExecutor.submit(new DsfRunnable() { +// public void run() { +// fRetrieveRm.setData(1); +// fRetrieveRm.done(); +// } +// }).get(); +// +// // Validate that cache is still disabled without data. +// assertCacheDisabledWithoutData(); +// } +// +// @Test +// public void disableWhileValidTest() throws InterruptedException, ExecutionException { +// // Request data from cache +// Query<Integer> q = new Query<Integer>() { +// @Override +// protected void execute(DataRequestMonitor<Integer> rm) { +// fTestCache.request(rm); +// } +// }; +// fExecutor.execute(q); +// +// // Wait until the cache starts data retrieval. +// waitForRetrieveRm(); +// +// // Complete the request +// fRetrieveRm.setData(1); +// fRetrieveRm.done(); +// +// q.get(); +// +// // Disable cache +// fExecutor.submit(new DsfRunnable() { +// public void run() { +// fTestCache.disable(); +// } +// }).get(); +// +// // Check final state +// assertCacheValidWithData(1); +// } + @Test public void disableWithValueTest() throws InterruptedException, ExecutionException { // Disable the cache with a given value @@ -297,42 +409,9 @@ public class CacheTests { assertCacheValidWithData(2); } - - @Test - public void resetBeforeRequestTest() throws InterruptedException, ExecutionException { - // Disable the cache with a given value - fExecutor.submit(new DsfRunnable() { - public void run() { - fTestCache.reset(); - } - }).get(); - - assertCacheResetWithoutData(); - - // Try to request data from cache (check that cache still works normally) - Query<Integer> q = new Query<Integer>() { - @Override - protected void execute(DataRequestMonitor<Integer> rm) { - fTestCache.update(rm); - } - }; - fExecutor.execute(q); - - // Wait until the cache starts data retrieval. - waitForRetrieveRm(); - - // Complete the request - fRetrieveRm.setData(1); - fRetrieveRm.done(); - - // Check result - Assert.assertEquals(1, (int)q.get()); - - assertCacheValidWithData(1); - } @Test - public void resetWhilePendingTest() throws InterruptedException, ExecutionException { + public void cancelWhilePendingTest() throws InterruptedException, ExecutionException { // Request data from cache Query<Integer> q = new Query<Integer>() { @Override @@ -344,16 +423,16 @@ public class CacheTests { // Wait until the cache starts data retrieval. waitForRetrieveRm(); + + // Cancel the client request + q.cancel(true); + try { + q.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; - // Disable the cache with a given value - fExecutor.submit(new DsfRunnable() { - public void run() { - fTestCache.reset(); - } - }).get(); - - assertCacheResetWithoutData(); - + assertCacheCanceled(); + // Completed the retrieve RM fExecutor.submit(new DsfRunnable() { public void run() { @@ -361,18 +440,24 @@ public class CacheTests { fRetrieveRm.done(); } }).get(); - - // Validate that cache is still disabled without data. - assertCacheResetWithoutData(); + + // Validate that cache accepts the canceled request data + assertCacheValidWithData(1); } @Test - public void cancelWhilePendingTest() throws InterruptedException, ExecutionException { + public void cancelWhilePendingWithoutClientNotificationTest() throws InterruptedException, ExecutionException { // Request data from cache Query<Integer> q = new Query<Integer>() { @Override protected void execute(DataRequestMonitor<Integer> rm) { - fTestCache.update(rm); + fTestCache.update(new DataRequestMonitor<Integer>(ImmediateExecutor.getInstance(), rm) { + @Override + public synchronized void addCancelListener(ICanceledListener listener) { + // Do not add the cancel listener so that the cancel request is not + // propagated to the cache. + } + }); } }; fExecutor.execute(q); @@ -382,12 +467,13 @@ public class CacheTests { // Cancel the client request q.cancel(true); + + assertCacheCanceled(); + try { q.get(); Assert.fail("Expected a cancellation exception"); } catch (CancellationException e) {} // Expected exception; - - assertCacheCanceled(); // Completed the retrieve RM fExecutor.submit(new DsfRunnable() { @@ -401,36 +487,80 @@ public class CacheTests { assertCacheValidWithData(1); } + /** + * This test forces a race condition where a client that requested data + * cancels. While shortly after a second client starts a new request. + * The first request's cancel should not interfere with the second + * request. + */ @Test - public void cancelWhilePendingWithoutClientNotificationTest() throws InterruptedException, ExecutionException { - // Request data from cache - Query<Integer> q = new Query<Integer>() { + public void cancelAfterCompletedRaceCondition() throws InterruptedException, ExecutionException { + + // Create a client request with a badly behaved cancel implementation. + @SuppressWarnings("unchecked") + final DataRequestMonitor<Integer>[] rmBad = (DataRequestMonitor<Integer>[])new DataRequestMonitor<?>[1] ; + final boolean qBadCanceled[] = new boolean[] { false }; + Query<Integer> qBad = new Query<Integer>() { @Override protected void execute(DataRequestMonitor<Integer> rm) { - fTestCache.update(new DataRequestMonitor<Integer>(ImmediateExecutor.getInstance(), rm) { + rmBad[0] = new DataRequestMonitor<Integer>(ImmediateExecutor.getInstance(), rm) { @Override - public synchronized void addCancelListener(ICanceledListener listener) { + public synchronized void removeCancelListener(ICanceledListener listener) { // Do not add the cancel listener so that the cancel request is not // propagated to the cache. } - }); + + @Override + public void cancel() { + if (qBadCanceled[0]) { + super.cancel(); + } + } + + @Override + public synchronized boolean isCanceled() { + return qBadCanceled[0]; + } + + @Override + public synchronized void done() { + // Avoid clearing cancel listeners list + }; + }; + + fTestCache.update(rmBad[0]); } }; - fExecutor.execute(q); + fExecutor.execute(qBad); // Wait until the cache starts data retrieval. waitForRetrieveRm(); - // Cancel the client request - q.cancel(true); + // Reset the cache + fExecutor.submit(new DsfRunnable() { + public void run() { + fRetrieveRm = null; + fTestCache.set(null, Status.OK_STATUS); + fTestCache.reset(); + } + }).get(); - assertCacheCanceled(); + Query<Integer> qGood = new Query<Integer>() { + @Override + protected void execute(DataRequestMonitor<Integer> rm) { + fTestCache.update(rm); + } + }; + fExecutor.execute(qGood); - try { - q.get(); - Assert.fail("Expected a cancellation exception"); - } catch (CancellationException e) {} // Expected exception; + // Wait until the cache starts data retrieval. + waitForRetrieveRm(); + + qBadCanceled[0] = true; + rmBad[0].cancel(); + Assert.assertFalse(fRetrieveRm.isCanceled()); + // Completed the retrieve RM fExecutor.submit(new DsfRunnable() { public void run() { @@ -439,7 +569,8 @@ public class CacheTests { } }).get(); - // Validate that cache accepts the canceled request data + qGood.get(); + assertCacheValidWithData(1); } @@ -594,19 +725,4 @@ public class CacheTests { // Check final state assertCacheResetWithoutData(); } - - @Test - public void resetWithValueTest() throws InterruptedException, ExecutionException { - // Disable the cache with a given value - fExecutor.submit(new DsfRunnable() { - public void run() { - fTestCache.reset(2, Status.OK_STATUS); - } - }).get(); - - // Validate that cache is disabled without data. - Assert.assertFalse(fTestCache.isValid()); - Assert.assertEquals(2, (int)fTestCache.getData()); - Assert.assertTrue(fTestCache.getStatus().isOK()); - } -} +}
\ No newline at end of file diff --git a/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/RangeCacheTests.java b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/RangeCacheTests.java new file mode 100644 index 00000000000..e6701914a09 --- /dev/null +++ b/dsf/org.eclipse.cdt.tests.dsf/src/org/eclipse/cdt/tests/dsf/concurrent/RangeCacheTests.java @@ -0,0 +1,407 @@ +/******************************************************************************* + * Copyright (c) 2006 Wind River Systems and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.cdt.tests.dsf.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; + +import junit.framework.Assert; + +import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; +import org.eclipse.cdt.dsf.concurrent.DsfRunnable; +import org.eclipse.cdt.dsf.concurrent.ICache; +import org.eclipse.cdt.dsf.concurrent.IDsfStatusConstants; +import org.eclipse.cdt.dsf.concurrent.ImmediateInDsfExecutor; +import org.eclipse.cdt.dsf.concurrent.Query; +import org.eclipse.cdt.dsf.concurrent.RangeCache; +import org.eclipse.cdt.tests.dsf.DsfTestPlugin; +import org.eclipse.cdt.tests.dsf.TestDsfExecutor; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests that exercise the DataCache object. + */ +public class RangeCacheTests { + + class TestRangeCache extends RangeCache<Integer> { + + public TestRangeCache() { + super(new ImmediateInDsfExecutor(fExecutor)); + } + + @Override + protected void retrieve(long offset, int count, DataRequestMonitor<List<Integer>> rm) { + fRetrieveInfos.add(new RetrieveInfo(offset, count, rm)); + } + + @Override + public void reset() { + super.reset(); + } + + @Override + public void set(long offset, int count, List<Integer> data, IStatus status) { + super.set(offset, count, data, status); + } + } + + class TestQuery extends Query<List<Integer>> { + long fOffset; + int fCount; + TestQuery(long offset, int count) { + fOffset = offset; + fCount = count; + } + + @Override + protected void execute(DataRequestMonitor<List<Integer>> rm) { + fRangeCache = fTestCache.getRange(fOffset, fCount); + fRangeCache.update(rm); + } + } + + class RetrieveInfo implements Comparable<RetrieveInfo> { + long fOffset; + int fCount; + DataRequestMonitor<List<Integer>> fRm; + RetrieveInfo(long offset, int count, DataRequestMonitor<List<Integer>> rm) { + fOffset = offset; + fCount = count; + fRm = rm; + } + + public int compareTo(RetrieveInfo o) { + if (fOffset > o.fOffset) { + return 1; + } else if (fOffset == o.fOffset) { + return 0; + } else /*if (fOffset < o.fOffset)*/ { + return -1; + } + } + } + + TestDsfExecutor fExecutor; + TestRangeCache fTestCache; + SortedSet<RetrieveInfo> fRetrieveInfos; + ICache<List<Integer>> fRangeCache; + + private List<Integer> makeList(long offset, int count) { + List<Integer> list = new ArrayList<Integer>(count); + for (int i = 0; i < count; i++) { + list.add((int)(i + offset)); + } + return list; + } + + /** + * There's no rule on how quickly the cache has to start data retrieval + * after it has been requested. It could do it immediately, or it could + * wait a dispatch cycle, etc.. + */ + private void waitForRetrieveRm(int size) { + synchronized(this) { + while (fRetrieveInfos.size() < size) { + try { + wait(100); + } catch (InterruptedException e) { + return; + } + } + } + } + + @Before + public void startExecutor() throws ExecutionException, InterruptedException { + fExecutor = new TestDsfExecutor(); + fTestCache = new TestRangeCache(); + fRetrieveInfos = new TreeSet<RetrieveInfo>(); + fRangeCache = null; + } + + @After + public void shutdownExecutor() throws ExecutionException, InterruptedException { + fExecutor.submit(new DsfRunnable() { public void run() { + fExecutor.shutdown(); + }}).get(); + if (fExecutor.exceptionsCaught()) { + Throwable[] exceptions = fExecutor.getExceptions(); + throw new ExecutionException(exceptions[0]); + } + fTestCache = null; + fExecutor = null; + } + + private void assertCacheValidWithData(ICache<List<Integer>> cache, long offset, int count) { + Assert.assertTrue(cache.isValid()); + Assert.assertEquals(makeList(offset, count), cache.getData()); + Assert.assertTrue(cache.getStatus().isOK()); + } + + private void assertCacheWaiting(ICache<List<Integer>> cache) { + Assert.assertFalse(cache.isValid()); + try { + cache.getData(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} + try { + cache.getStatus(); + Assert.fail("Expected an IllegalStateException"); + } catch (IllegalStateException e) {} + } + + private void completeInfo(RetrieveInfo info, long offset, int count) { + Assert.assertEquals(offset, info.fOffset); + Assert.assertEquals(count, info.fCount); + info.fRm.setData(makeList(offset, count)); + info.fRm.done(); + } + + private void getRange(long queryOffset, int queryCount, long[] retrieveOffsets, int retrieveCounts[]) throws InterruptedException, ExecutionException { + assert retrieveOffsets.length == retrieveCounts.length; + int retrieveCount = retrieveOffsets.length; + + // Request data from cache + TestQuery q = new TestQuery(queryOffset, queryCount); + + fRangeCache = null; + fRetrieveInfos.clear(); + + fExecutor.execute(q); + + // Wait until the cache requests the data. + waitForRetrieveRm(retrieveOffsets.length); + + if (retrieveCount != 0) { + assertCacheWaiting(fRangeCache); + + // Set the data without using an executor. + Assert.assertEquals(retrieveCount, fRetrieveInfos.size()); + int i = 0; + for (RetrieveInfo info : fRetrieveInfos) { + completeInfo(info, retrieveOffsets[i], retrieveCounts[i]); + i++; + } + } + + // Wait for data. + Assert.assertEquals(makeList(queryOffset, queryCount), q.get()); + + // Check state while waiting for data + assertCacheValidWithData(fRangeCache, queryOffset, queryCount); + } + + @Test + public void getOneRangeTest() throws InterruptedException, ExecutionException { + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + } + + @Test + public void getMultipleRangesTest() throws InterruptedException, ExecutionException { + // Retrieve a range in-between two cached ranges + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + getRange(200, 100, new long[] { 200 }, new int[] { 100 }); + getRange(0, 300, new long[] { 100 }, new int[] { 100 }); + + // Retrieve a range overlapping two cached ranges + getRange(1000, 100, new long[] { 1000 }, new int[] { 100 }); + getRange(1200, 100, new long[] { 1200 }, new int[] { 100 }); + getRange(900, 500, new long[] { 900, 1100, 1300 }, new int[] { 100, 100, 100 }); + + // Retrieve a range that's a subset of a cached range. + getRange(2000, 100, new long[] { 2000 }, new int[] { 100 }); + getRange(2000, 50, new long[] {}, new int[] {}); + getRange(2025, 50, new long[] {}, new int[] {}); + getRange(2050, 50, new long[] {}, new int[] {}); + } + + private void cancelRange(long queryOffset, int queryCount, long[] retrieveOffsets, int retrieveCounts[]) throws Exception { + int retrieveCount = retrieveOffsets.length; + + // Request data from cache + TestQuery q = new TestQuery(queryOffset, queryCount); + + fRangeCache = null; + fRetrieveInfos.clear(); + + fExecutor.execute(q); + + // Wait until the cache requests the data. + waitForRetrieveRm(retrieveCount); + + assertCacheWaiting(fRangeCache); + + // Set the data without using an executor. + Assert.assertEquals(retrieveCount, fRetrieveInfos.size()); + int i = 0; + for (RetrieveInfo info : fRetrieveInfos) { + Assert.assertEquals(retrieveOffsets[i], info.fOffset); + Assert.assertEquals(retrieveCounts[i], info.fCount); + Assert.assertFalse(info.fRm.isCanceled()); + i++; + } + + q.cancel(true); + try { + q.get(); + Assert.fail("Expected a cancellation exception"); + } catch (CancellationException e) {} // Expected exception; + + for (RetrieveInfo info : fRetrieveInfos) { + Assert.assertTrue(info.fRm.isCanceled()); + } + } + + @Test + public void cancelOneRangeTest() throws Exception { + cancelRange(0, 100, new long[] { 0 }, new int[] { 100 }); + } + + @Test + public void cancelMultipleRangesTest() throws Exception { + // Cancel a couple of ranges. + cancelRange(0, 100, new long[] { 0 }, new int[] { 100 }); + cancelRange(200, 100, new long[] { 200 }, new int[] { 100 }); + + // Cancel a range overlapping two previously canceled ranges. + cancelRange(0, 300, new long[] { 0 }, new int[] { 300 }); + } + + @Test + public void getAndCancelMultipleRangesTest() throws Exception { + // Cancel a range, then retrieve the same range + cancelRange(0, 100, new long[] { 0 }, new int[] { 100 }); + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + + // Cancel a range overlapping a cached range. + cancelRange(0, 200, new long[] { 100 }, new int[] { 100 }); + } + + @Test + public void resetOneRangeTest() throws InterruptedException, ExecutionException { + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(); + }; + }).get(); + + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + } + + @Test + public void resetMultipleRangesTest() throws InterruptedException, ExecutionException { + // Retrieve a range in-between two cached ranges + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + getRange(200, 100, new long[] { 200 }, new int[] { 100 }); + getRange(0, 300, new long[] { 100 }, new int[] { 100 }); + + // Retrieve a range overlapping two cached ranges + getRange(1000, 100, new long[] { 1000 }, new int[] { 100 }); + getRange(1200, 100, new long[] { 1200 }, new int[] { 100 }); + getRange(900, 500, new long[] { 900, 1100, 1300 }, new int[] { 100, 100, 100 }); + + // Retrieve a range that's a subset of a cached range. + getRange(2000, 100, new long[] { 2000 }, new int[] { 100 }); + getRange(2000, 50, new long[] {}, new int[] {}); + getRange(2025, 50, new long[] {}, new int[] {}); + getRange(2050, 50, new long[] {}, new int[] {}); + + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(); + }; + }).get(); + + // Retrieve a range in-between two cached ranges + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + getRange(200, 100, new long[] { 200 }, new int[] { 100 }); + getRange(0, 300, new long[] { 100 }, new int[] { 100 }); + + // Retrieve a range overlapping two cached ranges + getRange(1000, 100, new long[] { 1000 }, new int[] { 100 }); + getRange(1200, 100, new long[] { 1200 }, new int[] { 100 }); + getRange(900, 500, new long[] { 900, 1100, 1300 }, new int[] { 100, 100, 100 }); + + // Retrieve a range that's a subset of a cached range. + getRange(2000, 100, new long[] { 2000 }, new int[] { 100 }); + getRange(2000, 50, new long[] {}, new int[] {}); + getRange(2025, 50, new long[] {}, new int[] {}); + getRange(2050, 50, new long[] {}, new int[] {}); + } + + @Test + public void resetWhileInvalidTest() throws InterruptedException, ExecutionException { + // Request data from cache + TestQuery q = new TestQuery(10, 100); + + fRangeCache = null; + fRetrieveInfos.clear(); + + fExecutor.execute(q); + + // Wait until the cache requests the data. + waitForRetrieveRm(1); + + assertCacheWaiting(fRangeCache); + + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.reset(); + }; + }).get(); + + // Set the data without using an executor. + Assert.assertEquals(1, fRetrieveInfos.size()); + completeInfo(fRetrieveInfos.first(), 10, 100); + + // Wait for data. + Assert.assertEquals(makeList(10, 100), q.get()); + + // Check state while waiting for data + assertCacheValidWithData(fRangeCache, 10, 100); + } + + @Test + public void setOneRangeTest() throws InterruptedException, ExecutionException { + getRange(0, 100, new long[] { 0 }, new int[] { 100 }); + + fExecutor.submit(new DsfRunnable() { + public void run() { + fTestCache.set(0, 100, null, new Status(IStatus.ERROR, DsfTestPlugin.PLUGIN_ID, IDsfStatusConstants.INVALID_STATE, "Cache invalid", null)); + }; + }).get(); + + // Request data from cache + TestQuery q = new TestQuery(10, 100); + + fRangeCache = null; + fRetrieveInfos.clear(); + + fExecutor.execute(q); + + try { + q.get(); + Assert.fail("Expected an ExecutionException"); + } catch (ExecutionException e) {} + } + + +} |