Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 3689162e57f9ba7dce6bd250a64d8f1f2d7154da (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
/*******************************************************************************
 * Copyright (c) 2008, 2012 Wind River Systems and others.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 * 
 * Contributors:
 *     Wind River Systems - initial API and implementation
 *******************************************************************************/
package org.eclipse.tcf.debug.test.util;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.tcf.protocol.Protocol;

/**
 * @since 2.2
 */
public abstract class Transaction<V>  implements Future<V>, Runnable {

	/**
	 * The exception we throw when the client transaction logic asks us to
	 * validate a cache object that is stale (or has never obtained a value from
	 * the source)
	 */
    public static final InvalidCacheException INVALID_CACHE_EXCEPTION = new InvalidCacheException();
    
	/** The request object we've been given to set the transaction results in */
    private DataCallback<V> fRm;
    
    private Query<V> fQuery;
    
    public static class InvalidCacheException extends Exception {
        private static final long serialVersionUID = 1L;
    }

	/**
	 * Kicks off the transaction. We'll either complete the request monitor
	 * immediately if all the data points the transaction needs are cached and
	 * valid, or we'll end up asynchronously completing the monitor if and when
	 * either (a) all the data points are available and up-to-date, or (b)
	 * obtaining them from the source encountered an error. Note that there is
	 * potential in (b) for us to never complete the monitor. If one or more
	 * data points are perpetually becoming stale, then we'll indefinitely wait
	 * for them to stabilize. The caller should cancel its request monitor in
	 * order to get us to stop waiting.
	 * 
	 * @param rm Request completion monitor.
	 */
    public void request(DataCallback<V> rm) {
        if (fRm != null) {
            assert fRm.isCanceled();
            fRm.done();
        }
        fRm = rm;
        assert fRm != null;
        run();
    }

    protected void preProcess() {}
    
    protected void postProcess(boolean done, V data, Throwable error) {}
    
    protected boolean processUnchecked() {
        try {
            // Execute the transaction logic
            V data = process();
            
            // No exception means all cache objects used by the transaction
            // were valid and up to date. Complete the request
            setData(data);
            return true;
        }
        catch (InvalidCacheException e) {
            // At least one of the cache objects was stale/unset. Keep the
            // request monitor in the incomplete state, thus leaving our client
            // "waiting" (asynchronously). We'll get called again once the cache
            // objects are updated, thus re-starting the whole transaction
            // attempt.
            return false;
        }
        catch (Throwable e) {
            // At least one of the cache objects encountered a failure obtaining
            // the data from the source. Complete the request.
            setError(e);
            return true;
        }
    }
    
	/**
	 * The transaction logic--code that tries to synchronously make use of,
	 * usually, multiple data points that are normally obtained asynchronously.
	 * Each data point is represented by a cache object. The transaction logic
	 * must check the validity of each cache object just prior to using it
	 * (calling its getData()). It should do that check by calling one of our
	 * validate() methods. Those methods will throw InvalidCacheException if the
	 * cached data is invalid (stale, e.g.,) or CoreException if an error was
	 * encountered the last time it got data form the source. The exception will
	 * abort the transaction, but in the case of InvalidCacheException, we
	 * schedule an asynchronous call that will re-invoke the transaction
	 * logic once the cache object has been updated from the source.
	 * 
	 * @return the cached data if it's valid, otherwise an exception is thrown
     * @throws Transaction.InvalidCacheException Exception indicating that a 
     * cache is not valid and transaction will need to be rescheduled.
     * @throws CoreException Exception indicating that one of the caches is 
     * in error state and transaction cannot be processed.
	 */
    protected V process() throws InvalidCacheException, ExecutionException {
        return null;
    }

    /**
     * Can be called only while in process().
     * @param data
     */
    protected void setData(V data) {
        assert Protocol.isDispatchThread();
        fRm.setData(data);
    }
    
    /**
     * Can be called only while in process().
     * @param data
     */
    protected void setError(Throwable error) {
        assert Protocol.isDispatchThread();
        fRm.setError(error);
    }
    
	/**
	 * Method which invokes the transaction logic and handles any exception that
	 * may result. If that logic encounters a stale/unset cache object, then we
	 * simply do nothing. This method can be invoked by transaction logic when 
	 * caches have become valid, thus unblocking transaction processing.  
	 */
    public void run() {
        // If execute is called after transaction completes (as a result of a 
        // cancelled request completing for example), ignore it. 
        if (fRm == null) {
            return;
        }
        
        if (fRm.isCanceled()) {
            fRm.done();
            fRm = null;
            return;
        }

        preProcess();
        if (processUnchecked()) {
            postProcess(true, fRm.getData(), fRm.getError());
            fRm.done();
            fRm = null;
        } else {
            postProcess(false, null, null);
        }
    }

	/**
	 * Clients must call one of the validate methods prior to using (calling
	 * getData()) on data cache object.  
	 * 
	 * @param cache
	 *            the object being validated
	 * @throws InvalidCacheException
	 *             if the data is stale/unset
	 * @throws ExecutionException
	 *             if an error was encountered getting the data from the source
	 */
    public <T> T validate(ICache<T> cache) throws InvalidCacheException, ExecutionException {
        if (cache.isValid()) {
            if (cache.getError() != null) {
                throw new ExecutionException(cache.getError());
            }
            return cache.getData();
        } else {
			// Throw the invalid cache exception, but first ask the cache to
			// update itself from its source, and schedule a re-attempt of the
			// transaction logic to occur when the stale/unset cache has been
			// updated
            cache.wait(new Callback(fRm) {
                @Override
                protected void handleCompleted() {
                    run();
                }
            });
            throw INVALID_CACHE_EXCEPTION;
        }
    }

    /**
     * See {@link #validate(ICache)}. This variant simply validates
     * multiple cache objects.
     */
    public void  validate(ICache<?> ... caches) throws InvalidCacheException, ExecutionException {
        validate(Arrays.asList(caches));
    }

    /**
     * See {@link #validate(ICache)}. This variant validates
     * multiple cache objects.
     */
    public void validate(@SuppressWarnings("rawtypes") Iterable caches) throws InvalidCacheException, ExecutionException {
        // Check if any of the caches have errors:
        boolean allValid = true;
        
        for (Object cacheObj : caches) {
            ICache<?> cache = (ICache<?>)cacheObj;
            if (cache.isValid()) {
                if (cache.getError() != null) {
                    throw new ExecutionException(cache.getError());
                }
            } else {
                allValid = false;
            }
        }
        if (!allValid) {
            // Throw the invalid cache exception, but first schedule a
            // re-attempt of the transaction logic, to occur when the
            // stale/unset cache objects have been updated
            AggregateCallback countringRm = new AggregateCallback(fRm) {
                @Override
                protected void handleCompleted() {
                    run();
                }
            };
            int count = 0;
            for (Object cacheObj : caches) {
                ICache<?> cache = (ICache<?>)cacheObj;
                if (!cache.isValid()) {
                    cache.wait(countringRm);
                    count++;
                }
            }
            countringRm.setDoneCount(count);
            throw INVALID_CACHE_EXCEPTION;
        }        
    }

    /**
     * See {@link #validate(ICache)}.  This variant does not throw exceptions, 
     * instead it returns <code>false</code> if the cache is not valid.  If the 
     * given cache is valid, and this method returns <code>true</code>, clients 
     * must still check if the cache contains an error before retrieving its 
     * data through {@link ICache#getData()}.
     * 
     * @param cache the object being validated
     * @return returns <code>false</code> if the cache is not yet valid and 
     * transaction processing should be interrupted.
     */
    public boolean validateUnchecked(ICache<?> cache) {
        if (cache.isValid()) {
            return true;
        } else {
            // Just sk the cache to update itself from its source, and schedule a 
            // re-attempt of the transaction logic to occur when the stale/unset 
            // cache has been updated
            cache.wait(new Callback(fRm) {
                @Override
                protected void handleCompleted() {
                    run();
                }
            });
            return false;
        }        
    }
    
    /**
     * See {@link #validate(ICache)}. This variant validates
     * multiple cache objects.
     */
    public boolean validateUnchecked(ICache<?> ... caches) {
        return validateUnchecked(Arrays.asList(caches));
    }

    
    /**
     * See {@link #validate(ICache)}. This variant validates
     * multiple cache objects.
     */
    public boolean validateUnchecked(@SuppressWarnings("rawtypes") Iterable caches) {
        // Check if all caches are valid
        boolean allValid = true;
        
        for (Object cacheObj : caches) {
            ICache<?> cache = (ICache<?>)cacheObj;
            if (!cache.isValid()) {
                allValid = false;
            }
        }
        if (allValid) {
            return true;
        }
        
        // Just schedule a re-attempt of the transaction logic, to occur 
        // when the stale/unset cache objects have been updated
        AggregateCallback countringRm = new AggregateCallback(fRm) {
            @Override
            protected void handleCompleted() {
                run();
            }
        };
        int count = 0;
        for (Object cacheObj : caches) {
            ICache<?> cache = (ICache<?>)cacheObj;
            if (!cache.isValid()) {
                cache.wait(countringRm);
                count++;
            }
        }
        countringRm.setDoneCount(count);
        return false;
    }

    private synchronized Query<V> getQuery(boolean create) {
        if (fQuery == null && create) {
            fQuery = new Query<V>() {
                @Override
                protected void execute(DataCallback<V> callback) {
                    request(callback);
                }
            };
            fQuery.invoke();
        }
        
        return fQuery;
    }
    
    public boolean cancel(boolean mayInterruptIfRunning) {
        Query<V> query = getQuery(false);
        if (query != null) {
            return query.cancel(mayInterruptIfRunning);
        }
        return false;
    }

    public boolean isCancelled() {
        Query<V> query = getQuery(false);
        if (query != null) {
            return query.isCancelled();
        }
        return false;
    }

    public boolean isDone() {
        Query<V> query = getQuery(false);
        if (query != null) {
            return query.isDone();
        }
        return false;
    }

    public V get() throws InterruptedException, ExecutionException {
        return getQuery(true).get();
    }

    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return getQuery(true).get(timeout, unit);
    }

    
}

Back to the top