Skip to main content
summaryrefslogtreecommitdiffstats
path: root/dsf
diff options
context:
space:
mode:
authorPawel Piech2011-11-30 16:40:41 -0500
committerPawel Piech2011-11-30 16:40:41 -0500
commit58513ccf2d81c50492e2087a87d31cd15e9e60df (patch)
tree62debcd500079491cd47c81ad71858b88449ae20 /dsf
parent3bd4f3d0fcb4407ef8eb7e8972126d9f272e31c4 (diff)
downloadorg.eclipse.cdt-58513ccf2d81c50492e2087a87d31cd15e9e60df.tar.gz
org.eclipse.cdt-58513ccf2d81c50492e2087a87d31cd15e9e60df.tar.xz
org.eclipse.cdt-58513ccf2d81c50492e2087a87d31cd15e9e60df.zip
Bug 310345 - [concurrent] Asynchronous Cache Programming Model (ACPM)
utilities for DSF - Added an ACPM example to DSF examples plugin.
Diffstat (limited to 'dsf')
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java66
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java18
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java9
-rw-r--r--dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java32
-rw-r--r--dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java65
-rw-r--r--dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java132
-rw-r--r--dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java62
-rw-r--r--dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java14
-rw-r--r--dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java70
9 files changed, 290 insertions, 178 deletions
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java
index 1f22902065..84ad02dd93 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/AbstractCache.java
@@ -154,6 +154,27 @@ public abstract class AbstractCache<V> implements ICache<V> {
rm.done();
}
}
+
+ private void completeWaitingRms() {
+ Object waiting = null;
+ synchronized(this) {
+ waiting = fWaitingList;
+ fWaitingList = null;
+ }
+ if (waiting != null) {
+ if (waiting instanceof RequestMonitor) {
+ completeWaitingRm((RequestMonitor)waiting);
+ } else if (waiting instanceof RequestMonitor[]) {
+ RequestMonitor[] waitingList = (RequestMonitor[])waiting;
+ for (int i = 0; i < waitingList.length; i++) {
+ if (waitingList[i] != null) {
+ completeWaitingRm(waitingList[i]);
+ }
+ }
+ }
+ waiting = null;
+ }
+ }
private void completeWaitingRm(RequestMonitor rm) {
rm.setStatus(fStatus);
@@ -300,23 +321,32 @@ public abstract class AbstractCache<V> implements ICache<V> {
fStatus = status;
fValid = true;
- Object waiting = null;
- synchronized(this) {
- waiting = fWaitingList;
- fWaitingList = null;
- }
- if (waiting != null) {
- if (waiting instanceof RequestMonitor) {
- completeWaitingRm((RequestMonitor)waiting);
- } else if (waiting instanceof RequestMonitor[]) {
- RequestMonitor[] waitingList = (RequestMonitor[])waiting;
- for (int i = 0; i < waitingList.length; i++) {
- if (waitingList[i] != null) {
- completeWaitingRm(waitingList[i]);
- }
- }
- }
- waiting = null;
- }
+ completeWaitingRms();
}
+
+ /**
+ * Performs the set and reset operations in one step This allows the cache to
+ * remain in invalid state, but to notify any waiting listeners that the state of
+ * the cache has changed.
+ *
+ * @param data
+ * The data that should be returned to any clients waiting for
+ * cache data and for clients requesting data until the cache is
+ * invalidated.
+ * @status The status that should be returned to any clients waiting for
+ * cache data and for clients requesting data until the cache is
+ * invalidated
+ *
+ * @see #reset(Object, IStatus)
+ */
+ protected void setAndReset(V data, IStatus status) {
+ assert fExecutor.getDsfExecutor().isInExecutorThread();
+
+ fData = data;
+ fStatus = status;
+ fValid = false;
+
+ completeWaitingRms();
+ }
+
}
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java
index 9f8863c3da..8c5c09eb97 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RangeCache.java
@@ -107,8 +107,7 @@ abstract public class RangeCache<V> {
protected List<V> process() throws InvalidCacheException, CoreException {
clearCanceledRequests();
- List<ICache<?>> transactionRequests = getRequests(fOffset, fCount);
-
+ List<Request> transactionRequests = getRequests(fOffset, fCount);
validate(transactionRequests);
return makeElementsListFromRequests(transactionRequests, fOffset, fCount);
@@ -156,7 +155,7 @@ abstract public class RangeCache<V> {
public ICache<List<V>> getRange(final long offset, final int count) {
assert fExecutor.getDsfExecutor().isInExecutorThread();
- List<ICache<?>> requests = getRequests(offset, count);
+ List<Request> requests = getRequests(offset, count);
RequestCache<List<V>> range = new RequestCache<List<V>>(fExecutor) {
@Override
@@ -232,8 +231,8 @@ abstract public class RangeCache<V> {
}
}
- private List<ICache<?>> getRequests(long fOffset, int fCount) {
- List<ICache<?>> requests = new ArrayList<ICache<?>>(1);
+ private List<Request> getRequests(long fOffset, int fCount) {
+ List<Request> requests = new ArrayList<Request>(1);
// Create a new request for the data to retrieve.
Request current = new Request(fOffset, fCount);
@@ -252,7 +251,7 @@ abstract public class RangeCache<V> {
// Adjust the beginning of the requested range of data. If there
// is already an overlapping range in front of the requested range,
// then use it.
- private Request adjustRequestHead(Request request, List<ICache<?>> transactionRequests, long offset, int count) {
+ private Request adjustRequestHead(Request request, List<Request> transactionRequests, long offset, int count) {
SortedSet<Request> headRequests = fRequests.headSet(request);
if (!headRequests.isEmpty()) {
Request headRequest = headRequests.last();
@@ -276,7 +275,7 @@ abstract public class RangeCache<V> {
* @param transactionRequests
* @return
*/
- private Request adjustRequestTail(Request current, List<ICache<?>> transactionRequests, long offset, int count) {
+ private Request adjustRequestTail(Request current, List<Request> transactionRequests, long offset, int count) {
// Create a duplicate of the tailSet, in order to avoid a concurrent modification exception.
List<Request> tailSet = new ArrayList<Request>(fRequests.tailSet(current));
@@ -313,14 +312,13 @@ abstract public class RangeCache<V> {
return current;
}
- private List<V> makeElementsListFromRequests(List<ICache<?>> requests, long offset, int count) {
+ private List<V> makeElementsListFromRequests(List<Request> requests, long offset, int count) {
List<V> retVal = new ArrayList<V>(count);
long index = offset;
long end = offset + count;
int requestIdx = 0;
while (index < end ) {
- @SuppressWarnings("unchecked")
- Request request = (Request)requests.get(requestIdx);
+ Request request = requests.get(requestIdx);
if (index < request.fOffset + request.fCount) {
retVal.add( request.getData().get((int)(index - request.fOffset)) );
index ++;
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java
index 76b94d6e14..636db58b52 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/RequestCache.java
@@ -94,4 +94,13 @@ public abstract class RequestCache<V> extends AbstractCache<V> {
}
super.set(data, status);
}
+
+ @Override
+ protected void reset() {
+ if (fRm != null) {
+ fRm.cancel();
+ fRm = null;
+ }
+ super.reset();
+ }
}
diff --git a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java
index ef72971b5a..2b047bdbb9 100644
--- a/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java
+++ b/dsf/org.eclipse.cdt.dsf/src/org/eclipse/cdt/dsf/concurrent/Transaction.java
@@ -70,8 +70,10 @@ public abstract class Transaction<V> {
* logic once the cache object has been updated from the source.
*
* @return the cached data if it's valid, otherwise an exception is thrown
- * @throws InvalidCacheException
- * @throws CoreException
+ * @throws Transaction.InvalidCacheException Exception indicating that a
+ * cache is not valid and transaction will need to be rescheduled.
+ * @throws CoreException Exception indicating that one of the caches is
+ * in error state and transaction cannot be processed.
*/
abstract protected V process() throws InvalidCacheException, CoreException;
@@ -149,19 +151,20 @@ public abstract class Transaction<V> {
* See {@link #validate(RequestCache)}. This variant simply validates
* multiple cache objects.
*/
- public void validate(ICache<?> ... caches) throws InvalidCacheException, CoreException {
+ public <T> void validate(ICache<?> ... caches) throws InvalidCacheException, CoreException {
validate(Arrays.asList(caches));
}
-
- /**
- * See {@link #validate(RequestCache)}. This variant simply validates
- * multiple cache objects.
- */
- public void validate(Iterable<ICache<?>> caches) throws InvalidCacheException, CoreException {
+
+ /**
+ * See {@link #validate(RequestCache)}. This variant simply validates
+ * multiple cache objects.
+ */
+ public void validate(@SuppressWarnings("rawtypes") Iterable caches) throws InvalidCacheException, CoreException {
// Check if any of the caches have errors:
boolean allValid = true;
- for (ICache<?> cache : caches) {
+ for (Object cacheObj : caches) {
+ ICache<?> cache = (ICache<?>)cacheObj;
if (cache.isValid()) {
if (!cache.getStatus().isOK()) {
throw new CoreException(cache.getStatus());
@@ -171,9 +174,9 @@ public abstract class Transaction<V> {
}
}
if (!allValid) {
- // Throw the invalid cache exception, but first schedule a
- // re-attempt of the transaction logic, to occur when the
- // stale/unset cache objects have been updated
+ // Throw the invalid cache exception, but first schedule a
+ // re-attempt of the transaction logic, to occur when the
+ // stale/unset cache objects have been updated
CountingRequestMonitor countringRm = new CountingRequestMonitor(ImmediateExecutor.getInstance(), fRm) {
@Override
protected void handleCompleted() {
@@ -181,7 +184,8 @@ public abstract class Transaction<V> {
}
};
int count = 0;
- for (ICache<?> cache : caches) {
+ for (Object cacheObj : caches) {
+ ICache<?> cache = (ICache<?>)cacheObj;
if (!cache.isValid()) {
cache.update(countringRm);
count++;
diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java
index 01e29901fb..d19ef9920a 100644
--- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java
+++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/AsyncDataViewer.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2006, 2009 Wind River Systems and others.
+ * Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -66,13 +66,15 @@ public class AsyncDataViewer
final private IDataGenerator fDataGenerator;
// Fields used in request cancellation logic.
- private List<ValueDataRequestMonitor> fItemDataRequestMonitors = new LinkedList<ValueDataRequestMonitor>();
+ private List<ValueDataRequestMonitor> fItemDataRequestMonitors =
+ new LinkedList<ValueDataRequestMonitor>();
private Set<Integer> fIndexesToCancel = new HashSet<Integer>();
private int fCancelCallsPending = 0;
public AsyncDataViewer(TableViewer viewer, IDataGenerator generator) {
fViewer = viewer;
- fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor(fViewer.getTable().getDisplay());
+ fDisplayExecutor = DisplayDsfExecutor.getDisplayDsfExecutor(
+ fViewer.getTable().getDisplay());
fDataGenerator = generator;
fDataGenerator.addListener(this);
}
@@ -104,10 +106,18 @@ public class AsyncDataViewer
1, TimeUnit.MILLISECONDS);
}
+ /**
+ * Calculates the number of visible items based on the top item index and
+ * table bounds.
+ * @param top Index of top item.
+ * @return calculated number of items in viewer
+ */
private int getVisibleItemCount(int top) {
Table table = fViewer.getTable();
int itemCount = table.getItemCount();
- return Math.min((table.getBounds().height / table.getItemHeight()) + 2, itemCount - top);
+ return Math.min(
+ (table.getBounds().height / table.getItemHeight()) + 2,
+ itemCount - top);
}
@ThreadSafe
@@ -131,7 +141,10 @@ public class AsyncDataViewer
}});
}
-
+ /**
+ * Retrieve the up to date count. When a new count is set to viewer, the
+ * viewer will refresh all items as well.
+ */
private void queryItemCount() {
// Request count from data provider. When the count is returned, we
// have to re-dispatch into the display thread to avoid calling
@@ -150,13 +163,25 @@ public class AsyncDataViewer
}
}
});
-
}
-
- // Dedicated class for data item requests. This class holds the index
- // argument so it can be examined when canceling stale requests.
- private class ValueDataRequestMonitor extends DataRequestMonitor<String> {
+
+ /**
+ * Retrieves value of an element at given index. When complete the value
+ * is written to the viewer.
+ * @param index Index of value to retrieve.
+ */
+ private void queryValue(final int index) {
+ ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index);
+ fItemDataRequestMonitors.add(rm);
+ fDataGenerator.getValue(index, rm);
+ }
+
+ /**
+ * Dedicated class for data item requests. This class holds the index
+ * argument so it can be examined when canceling stale requests.
+ */
+ private class ValueDataRequestMonitor extends DataRequestMonitor<Integer> {
/** Index is used when canceling stale requests. */
int fIndex;
@@ -170,7 +195,8 @@ public class AsyncDataViewer
protected void handleCompleted() {
fItemDataRequestMonitors.remove(this);
- // Check if the request completed successfully, otherwise ignore it.
+ // Check if the request completed successfully, otherwise ignore
+ // it.
if (isSuccess()) {
if (!fViewer.getTable().isDisposed()) {
fViewer.replace(getData(), fIndex);
@@ -178,12 +204,6 @@ public class AsyncDataViewer
}
}
}
-
- private void queryValue(final int index) {
- ValueDataRequestMonitor rm = new ValueDataRequestMonitor(index);
- fItemDataRequestMonitors.add(rm);
- fDataGenerator.getValue(index, rm);
- }
private void cancelStaleRequests(int topIdx, int botIdx) {
// Decrement the count of outstanding cancel calls.
@@ -194,7 +214,10 @@ public class AsyncDataViewer
// Go through the outstanding requests and cancel any that
// are not visible anymore.
- for (Iterator<ValueDataRequestMonitor> itr = fItemDataRequestMonitors.iterator(); itr.hasNext();) {
+ for (Iterator<ValueDataRequestMonitor> itr =
+ fItemDataRequestMonitors.iterator();
+ itr.hasNext();)
+ {
ValueDataRequestMonitor item = itr.next();
if (item.fIndex < topIdx || item.fIndex > botIdx) {
// Set the item to canceled status, so that the data provider
@@ -237,14 +260,16 @@ public class AsyncDataViewer
Font font = new Font(display, "Courier", 10, SWT.NORMAL);
// Create the table viewer.
- TableViewer tableViewer = new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL);
+ TableViewer tableViewer =
+ new TableViewer(shell, SWT.BORDER | SWT.VIRTUAL);
tableViewer.getControl().setLayoutData(data);
// Create the data generator.
final IDataGenerator generator = new DataGeneratorWithExecutor();
// Create the content provider which will populate the viewer.
- AsyncDataViewer contentProvider = new AsyncDataViewer(tableViewer, generator);
+ AsyncDataViewer contentProvider =
+ new AsyncDataViewer(tableViewer, generator);
tableViewer.setContentProvider(contentProvider);
tableViewer.setInput(new Object());
diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java
index 3a7393c7fd..d1ee0b542e 100644
--- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java
+++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithExecutor.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2006, 2009 Wind River Systems and others.
+ * Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -14,14 +14,14 @@ package org.eclipse.cdt.examples.dsf.dataviewer;
//#package org.eclipse.cdt.examples.dsf.dataviewer.answers;
//#endif
-import java.util.HashSet;
+import java.util.HashMap;
//#ifdef answers
//#import java.util.Iterator;
//#endif
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -70,6 +70,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
Request(RequestMonitor rm) {
fRequestMonitor = rm;
+
+ rm.addCancelListener(new RequestMonitor.ICanceledListener() {
+ public void requestCanceled(RequestMonitor rm) {
+ fExecutor.execute(new DsfRunnable() {
+ public void run() {
+ fQueue.remove(Request.this);
+ }
+ });
+ }
+ });
}
}
@@ -93,7 +103,7 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#endif
class ItemRequest extends Request {
final int fIndex;
- ItemRequest(int index, DataRequestMonitor<String> rm) {
+ ItemRequest(int index, DataRequestMonitor<Integer> rm) {
super(rm);
fIndex = index;
}
@@ -156,24 +166,20 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#else
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
- private Set<Integer> fChangedIndexes = new HashSet<Integer>();
+ private Map<Integer, Integer> fChangedValues =
+ new HashMap<Integer, Integer>();
- // Flag used to ensure that requests are processed sequentially.
- //#ifdef exercises
- // TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
- // indicating allowed thread access to this class/method/member
- //#else
-//# @ConfinedToDsfExecutor("fExecutor")
- //#endif
- private boolean fServiceQueueInProgress = false;
-
+ public DataGeneratorWithExecutor() {
+ // Create the executor
+ this(new DefaultDsfExecutor("Supplier Executor"));
+ }
//#ifdef exercises
// TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
// indicating allowed thread access to this class/method/member
//#endif
- public DataGeneratorWithExecutor() {
+ public DataGeneratorWithExecutor(DsfExecutor executor) {
// Create the executor
- fExecutor = new DefaultDsfExecutor("Supplier Executor");
+ fExecutor = executor;
// Schedule a runnable to make the random changes.
fExecutor.scheduleAtFixedRate(
@@ -182,8 +188,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
randomChanges();
}
},
- RANDOM_CHANGE_INTERVAL,
- RANDOM_CHANGE_INTERVAL,
+ new Random().nextInt() % RANDOM_CHANGE_INTERVAL,
+ RANDOM_CHANGE_INTERVAL, //Add a 10% variance to the interval.
TimeUnit.MILLISECONDS);
}
@@ -197,8 +203,9 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
public void run() {
// Empty the queue of requests and fail them.
for (Request request : fQueue) {
- request.fRequestMonitor.setStatus(
- new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ request.fRequestMonitor.setStatus(new Status(
+ IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
request.fRequestMonitor.done();
}
fQueue.clear();
@@ -209,7 +216,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
}
});
} catch (RejectedExecutionException e) {
- rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
rm.done();
}
}
@@ -227,7 +235,9 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
}
});
} catch (RejectedExecutionException e) {
- rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ rm.setStatus(new Status(
+ IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
rm.done();
}
}
@@ -236,7 +246,7 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
// TODO Exercise 4 - Add an annotation (ThreadSafe/ConfinedToDsfExecutor)
// indicating allowed thread access to this class/method/member
//#endif
- public void getValue(final int index, final DataRequestMonitor<String> rm) {
+ public void getValue(final int index, final DataRequestMonitor<Integer> rm) {
try {
fExecutor.execute( new DsfRunnable() {
public void run() {
@@ -245,7 +255,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
}
});
} catch (RejectedExecutionException e) {
- rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
rm.done();
}
}
@@ -286,7 +297,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//# @ConfinedToDsfExecutor("fExecutor")
//#endif
private void serviceQueue() {
-
+ fExecutor.schedule(
+ new DsfRunnable() {
+ public void run() {
+ doServiceQueue();
+ }
+ },
+ PROCESSING_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ private void doServiceQueue() {
//#ifdef exercises
// TODO Exercise 3 - Add logic to discard cancelled requests from queue.
// Hint: Since serviceQueue() is called using the executor, and the
@@ -305,33 +325,16 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//# }
//#endif
- // If a queue servicing is already scheduled, do nothing.
- if (fServiceQueueInProgress) {
- return;
- }
-
- if (fQueue.size() != 0) {
+ while (fQueue.size() != 0) {
// If there are requests to service, remove one from the queue and
// schedule a runnable to process the request after a processing
// delay.
- fServiceQueueInProgress = true;
- final Request request = fQueue.remove(0);
- fExecutor.schedule(
- new DsfRunnable() {
- public void run() {
- if (request instanceof CountRequest) {
- processCountRequest((CountRequest)request);
- } else if (request instanceof ItemRequest) {
- processItemRequest((ItemRequest)request);
- }
-
- // Reset the processing flag and process next
- // request.
- fServiceQueueInProgress = false;
- serviceQueue();
- }
- },
- PROCESSING_DELAY, TimeUnit.MILLISECONDS);
+ Request request = fQueue.remove(0);
+ if (request instanceof CountRequest) {
+ processCountRequest((CountRequest)request);
+ } else if (request instanceof ItemRequest) {
+ processItemRequest((ItemRequest)request);
+ }
}
}
@@ -343,7 +346,8 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#endif
private void processCountRequest(CountRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
- DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
+ DataRequestMonitor<Integer> rm =
+ (DataRequestMonitor<Integer>)request.fRequestMonitor;
rm.setData(fCount);
rm.done();
@@ -357,12 +361,13 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
//#endif
private void processItemRequest(ItemRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
- DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor;
+ DataRequestMonitor<Integer> rm =
+ (DataRequestMonitor<Integer>)request.fRequestMonitor;
- if (fChangedIndexes.contains(request.fIndex)) {
- rm.setData("Changed: " + request.fIndex);
+ if (fChangedValues.containsKey(request.fIndex)) {
+ rm.setData(fChangedValues.get(request.fIndex));
} else {
- rm.setData(Integer.toString(request.fIndex));
+ rm.setData(request.fIndex);
}
rm.done();
}
@@ -398,10 +403,11 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
private void randomCountReset() {
// Calculate the new count.
Random random = new java.util.Random();
- fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
+ fCount = MIN_COUNT +
+ Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Reset the changed values.
- fChangedIndexes.clear();
+ fChangedValues.clear();
// Notify listeners
for (Listener listener : fListeners) {
@@ -421,17 +427,19 @@ public class DataGeneratorWithExecutor implements IDataGenerator {
private void randomDataChange() {
// Calculate the indexes to change.
Random random = new java.util.Random();
- Set<Integer> set = new HashSet<Integer>();
+ Map<Integer, Integer> changed = new HashMap<Integer, Integer>();
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
- set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
- }
+ int randomIndex = Math.abs(random.nextInt()) % fCount;
+ int randomValue = Math.abs(random.nextInt()) % fCount;
+ changed.put(randomIndex, randomValue);
+ }
// Add the indexes to an overall set of changed indexes.
- fChangedIndexes.addAll(set);
+ fChangedValues.putAll(changed);
// Notify listeners
- for (Listener listener : fListeners) {
- listener.valuesChanged(set);
+ for (Object listener : fListeners) {
+ ((Listener)listener).valuesChanged(changed.keySet());
}
}
}
diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java
index e4e736a14e..3b4a909e88 100644
--- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java
+++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/DataGeneratorWithThread.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2006, 2009 Wind River Systems and others.
+ * Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -15,9 +15,9 @@ package org.eclipse.cdt.examples.dsf.dataviewer;
//#endif
import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Random;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -41,7 +41,9 @@ import org.eclipse.cdt.examples.dsf.DsfExamplesPlugin;
* synchronization.
* </p>
*/
-public class DataGeneratorWithThread extends Thread implements IDataGenerator {
+public class DataGeneratorWithThread extends Thread
+ implements IDataGenerator
+{
// Request objects are used to serialize the interface calls into objects
// which can then be pushed into a queue.
@@ -61,7 +63,7 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
class ItemRequest extends Request {
final int fIndex;
- ItemRequest(int index, DataRequestMonitor<String> rm) {
+ ItemRequest(int index, DataRequestMonitor<Integer> rm) {
super(rm);
fIndex = index;
}
@@ -76,7 +78,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
// Main request queue of the data generator. The getValue(), getCount(),
// and shutdown() methods write into the queue, while the run() method
// reads from it.
- private final BlockingQueue<Request> fQueue = new LinkedBlockingQueue<Request>();
+ private final BlockingQueue<Request> fQueue =
+ new LinkedBlockingQueue<Request>();
// ListenerList class provides thread safety.
private ListenerList fListeners = new ListenerList();
@@ -88,7 +91,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private int fCountResetTrigger = 0;
// Elements which were modified since the last reset.
- private Set<Integer> fChangedIndexes = Collections.synchronizedSet(new HashSet<Integer>());
+ private Map<Integer, Integer> fChangedValues =
+ Collections.synchronizedMap(new HashMap<Integer, Integer>());
// Used to determine when to make changes in data.
private long fLastChangeTime = System.currentTimeMillis();
@@ -108,7 +112,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
fQueue.add(new ShutdownRequest(rm));
} else {
//
- rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
rm.done();
}
}
@@ -117,16 +122,18 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
if (!fShutdown.get()) {
fQueue.add(new CountRequest(rm));
} else {
- rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
rm.done();
}
}
- public void getValue(int index, DataRequestMonitor<String> rm) {
+ public void getValue(int index, DataRequestMonitor<Integer> rm) {
if (!fShutdown.get()) {
fQueue.add(new ItemRequest(index, rm));
} else {
- rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
+ rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID,
+ "Supplier shut down"));
rm.done();
}
}
@@ -150,7 +157,6 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
// If a request was dequeued, process it.
if (request != null) {
// Simulate a processing delay.
- Thread.sleep(PROCESSING_DELAY);
if (request instanceof CountRequest) {
processCountRequest((CountRequest)request);
@@ -162,6 +168,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
request.fRequestMonitor.done();
break;
}
+ } else {
+ Thread.sleep(PROCESSING_DELAY);
}
// Simulate data changes.
@@ -173,7 +181,8 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void processCountRequest(CountRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
- DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
+ DataRequestMonitor<Integer> rm =
+ (DataRequestMonitor<Integer>)request.fRequestMonitor;
rm.setData(fCount);
rm.done();
@@ -181,12 +190,13 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void processItemRequest(ItemRequest request) {
@SuppressWarnings("unchecked") // Suppress warning about lost type info.
- DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor;
+ DataRequestMonitor<Integer> rm =
+ (DataRequestMonitor<Integer>)request.fRequestMonitor;
- if (fChangedIndexes.contains(request.fIndex)) {
- rm.setData("Changed: " + request.fIndex);
+ if (fChangedValues.containsKey(request.fIndex)) {
+ rm.setData(fChangedValues.get(request.fIndex));
} else {
- rm.setData(Integer.toString(request.fIndex));
+ rm.setData(request.fIndex);
}
rm.done();
}
@@ -194,12 +204,14 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void randomChanges() {
// Check if enough time is elapsed.
- if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_INTERVAL) {
+ if (System.currentTimeMillis() >
+ fLastChangeTime + RANDOM_CHANGE_INTERVAL)
+ {
fLastChangeTime = System.currentTimeMillis();
// Once every number of changes, reset the count, the rest of the
// times just change certain values.
- if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){
+ if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0) {
randomCountReset();
} else {
randomDataChange();
@@ -213,7 +225,7 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
// Reset the changed values.
- fChangedIndexes.clear();
+ fChangedValues.clear();
// Notify listeners
for (Object listener : fListeners.getListeners()) {
@@ -224,17 +236,19 @@ public class DataGeneratorWithThread extends Thread implements IDataGenerator {
private void randomDataChange() {
// Calculate the indexes to change.
Random random = new java.util.Random();
- Set<Integer> set = new HashSet<Integer>();
+ Map<Integer, Integer> changed = new HashMap<Integer, Integer>();
for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
- set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
+ int randomIndex = Math.abs(random.nextInt()) % fCount;
+ int randomValue = Math.abs(random.nextInt()) % fCount;
+ changed.put(randomIndex, randomValue);
}
// Add the indexes to an overall set of changed indexes.
- fChangedIndexes.addAll(set);
+ fChangedValues.putAll(changed);
// Notify listeners
for (Object listener : fListeners.getListeners()) {
- ((Listener)listener).valuesChanged(set);
+ ((Listener)listener).valuesChanged(changed.keySet());
}
}
}
diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java
index 398b09a4e8..ef7eb53f27 100644
--- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java
+++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/IDataGenerator.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2006, 2009 Wind River Systems and others.
+ * Copyright (c) 2006, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -41,11 +41,11 @@ public interface IDataGenerator {
// Changing the count range can stress the scalability of the system, while
// changing of the process delay and random change interval can stress
// its performance.
- final static int MIN_COUNT = 100;
- final static int MAX_COUNT = 200;
- final static int PROCESSING_DELAY = 10;
- final static int RANDOM_CHANGE_INTERVAL = 10000;
- final static int RANDOM_COUNT_CHANGE_INTERVALS = 3;
+ final static int MIN_COUNT = 50;
+ final static int MAX_COUNT = 100;
+ final static int PROCESSING_DELAY = 500;
+ final static int RANDOM_CHANGE_INTERVAL = 4000;
+ final static int RANDOM_COUNT_CHANGE_INTERVALS = 5;
final static int RANDOM_CHANGE_SET_PERCENTAGE = 10;
@@ -58,7 +58,7 @@ public interface IDataGenerator {
// Data access methods.
void getCount(DataRequestMonitor<Integer> rm);
- void getValue(int index, DataRequestMonitor<String> rm);
+ void getValue(int index, DataRequestMonitor<Integer> rm);
// Method used to shutdown the data generator including any threads that
// it may use.
diff --git a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java
index 6ce9c06ef4..335d198415 100644
--- a/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java
+++ b/dsf/org.eclipse.cdt.examples.dsf/src_preprocess/org/eclipse/cdt/examples/dsf/dataviewer/SyncDataViewer.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2008, 2009 Wind River Systems and others.
+ * Copyright (c) 2008, 2011 Wind River Systems and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -14,8 +14,11 @@ package org.eclipse.cdt.examples.dsf.dataviewer;
//#package org.eclipse.cdt.examples.dsf.dataviewer.answers;
//#endif
+import java.util.Arrays;
+import java.util.List;
import java.util.Set;
+import org.eclipse.cdt.dsf.concurrent.CountingRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
import org.eclipse.cdt.dsf.concurrent.ImmediateExecutor;
import org.eclipse.cdt.dsf.concurrent.Query;
@@ -35,8 +38,8 @@ import org.eclipse.swt.widgets.Shell;
* This viewer implements the {@link IStructuredContentProvider} interface
* which is used by the JFace TableViewer class to populate a Table. This
* interface contains one principal methods for reading data {@link #getElements(Object)},
- * which synchronously returns an array of elements. In order to implement this
- * method using the asynchronous data generator, this provider uses the
+ * which synchronously returns an array of elements. In order to implement
+ * this method using the asynchronous data generator, this provider uses the
* {@link Query} object.
* </p>
*/
@@ -84,27 +87,43 @@ public class SyncDataViewer
return new Object[0];
}
- // Create the array that will be filled with elements.
- // For each index in the array execute a query to get the element at
- // that index.
- final Object[] elements = new Object[count];
-
- for (int i = 0; i < count; i++) {
- final int index = i;
- Query<String> valueQuery = new Query<String>() {
- @Override
- protected void execute(DataRequestMonitor<String> rm) {
- fDataGenerator.getValue(index, rm);
+ final int finalCount = count;
+ Query<List<Integer>> valueQuery = new Query<List<Integer>>() {
+ @Override
+ protected void execute(final DataRequestMonitor<List<Integer>> rm) {
+ final Integer[] retVal = new Integer[finalCount];
+ final CountingRequestMonitor crm = new CountingRequestMonitor(
+ ImmediateExecutor.getInstance(), rm)
+ {
+ @Override
+ protected void handleSuccess() {
+ rm.setData(Arrays.asList(retVal));
+ rm.done();
+ };
+ };
+ for (int i = 0; i < finalCount; i++) {
+ final int finalI = i;
+ fDataGenerator.getValue(
+ i,
+ new DataRequestMonitor<Integer>(
+ ImmediateExecutor.getInstance(), crm)
+ {
+ @Override
+ protected void handleSuccess() {
+ retVal[finalI] = getData();
+ crm.done();
+ }
+ });
}
- };
- ImmediateExecutor.getInstance().execute(valueQuery);
- try {
- elements[i] = valueQuery.get();
- } catch (Exception e) {
- elements[i] = "error";
- }
+ crm.setDoneCount(finalCount);
+ }
+ };
+ ImmediateExecutor.getInstance().execute(valueQuery);
+ try {
+ return valueQuery.get().toArray(new Integer[0]);
+ } catch (Exception e) {
}
- return elements;
+ return new Object[0];
}
public void dispose() {
@@ -140,6 +159,10 @@ public class SyncDataViewer
});
}
+ /**
+ * The entry point for the example.
+ * @param args Program arguments.
+ */
public static void main(String[] args) {
// Create the shell to hold the viewer.
Display display = new Display();
@@ -162,7 +185,8 @@ public class SyncDataViewer
//#endif
// Create the content provider which will populate the viewer.
- SyncDataViewer contentProvider = new SyncDataViewer(tableViewer, generator);
+ SyncDataViewer contentProvider =
+ new SyncDataViewer(tableViewer, generator);
tableViewer.setContentProvider(contentProvider);
tableViewer.setInput(new Object());

Back to the top