Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 8bbe1b05238725ac55e3b1414738aaa4df38b8ae (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
/*******************************************************************************
 * Copyright (c) 2007, 2010 Wind River Systems, Inc. and others.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *     Wind River Systems - initial API and implementation
 *******************************************************************************/
package org.eclipse.tm.tcf.util;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.tm.tcf.protocol.IChannel;
import org.eclipse.tm.tcf.protocol.Protocol;

/**
 * A <tt>TCFTask</tt> is an utility class that represents the result of an asynchronous
 * communication over TCF framework.  Methods are provided to check if the communication is
 * complete, to wait for its completion, and to retrieve the result of
 * the communication.
 *
 * TCFTask is useful when communication is requested by a thread other then TCF dispatch thread.
 * If client has a global state, for example, cached remote data, multithreading should be avoided,
 * because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment.
 * Such clients should consider message driven design, see TCFDataCache and its usage as an example.
 *
 * If a client is extending TCFTask it should implement run() method to perform actual communications.
 * The run() method will be execute by TCF dispatch thread, and client code should then call either done() or
 * error() to indicate that task computations are complete.
 */
public abstract class TCFTask<V> implements Runnable, Future<V> {

    private V result;
    private boolean done;
    private Throwable error;
    private boolean canceled;
    private IChannel channel;
    private IChannel.IChannelListener channel_listener;

    /**
     * Construct a TCF task object and schedule it for execution.
     */
    public TCFTask() {
        Protocol.invokeLater(new Runnable() {
            public void run() {
                try {
                    TCFTask.this.run();
                }
                catch (Throwable x) {
                    if (!done && error == null) error(x);
                }
            }
        });
    }

    /**
     * Construct a TCF task object and schedule it for execution.
     * The task will be canceled if it is not completed after given timeout.
     * @param timeout - max time in milliseconds.
     */
    public TCFTask(long timeout) {
        Protocol.invokeLater(new Runnable() {
            public void run() {
                try {
                    TCFTask.this.run();
                }
                catch (Throwable x) {
                    if (!done && error == null) error(x);
                }
            }
        });
        Protocol.invokeLater(timeout, new Runnable() {
            public void run() {
                cancel(true);
            }
        });
    }

    /**
     * Construct a TCF task object and schedule it for execution.
     * The task will be canceled if the given channel is closed or
     * terminated while the task is in progress.
     * @param channel
     */
    public TCFTask(final IChannel channel) {
        Protocol.invokeLater(new Runnable() {
            public void run() {
                try {
                    if (channel.getState() != IChannel.STATE_OPEN) throw new Exception("Channel is closed");
                    TCFTask.this.channel = channel;
                    channel_listener = new IChannel.IChannelListener() {

                        public void congestionLevel(int level) {
                        }

                        public void onChannelClosed(final Throwable error) {
                            cancel(true);
                        }

                        public void onChannelOpened() {
                        }
                    };
                    channel.addChannelListener(channel_listener);
                    TCFTask.this.run();
                }
                catch (Throwable x) {
                    if (!done && error == null) error(x);
                }
            }
        });
    }

    /**
     * Set a result of this task and notify all threads waiting for the task to complete.
     * The method is supposed to be called in response to executing of run() method of this task.
     *
     * @param result - the computed result
     */
    public synchronized void done(V result) {
        assert Protocol.isDispatchThread();
        if (canceled) return;
        assert !done;
        assert this.error == null;
        assert this.result == null;
        this.result = result;
        done = true;
        if (channel != null) channel.removeChannelListener(channel_listener);
        notifyAll();
    }

    /**
     * Set a error and notify all threads waiting for the task to complete.
     * The method is supposed to be called in response to executing of run() method of this task.
     *
     * @param error - computation error.
     */
    public synchronized void error(Throwable error) {
        assert Protocol.isDispatchThread();
        assert error != null;
        if (canceled) return;
        assert this.error == null;
        assert this.result == null;
        assert !done;
        this.error = error;
        if (channel != null) channel.removeChannelListener(channel_listener);
        notifyAll();
    }

    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, already been canceled,
     * or could not be canceled for some other reason. If successful,
     * and this task has not started when <tt>cancel</tt> is called,
     * this task should never run.  If the task has already started,
     * then the <tt>mayInterruptIfRunning</tt> parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return <tt>false</tt> if the task could not be canceled,
     * typically because it has already completed normally;
     * <tt>true</tt> otherwise
     */
    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
        assert Protocol.isDispatchThread();
        if (isDone()) return false;
        canceled = true;
        error = new CancellationException();
        if (channel != null) channel.removeChannelListener(channel_listener);
        notifyAll();
        return true;
    }

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was canceled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    public synchronized V get() throws InterruptedException, ExecutionException {
        assert !Protocol.isDispatchThread();
        while (!isDone()) wait();
        if (error != null) {
            if (error instanceof ExecutionException) throw (ExecutionException)error;
            if (error instanceof InterruptedException) throw (InterruptedException)error;
            throw new ExecutionException("TCF task aborted", error);
        }
        return result;
    }

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws Error if the computation was canceled or threw an exception
     */
    public synchronized V getE() {
        assert !Protocol.isDispatchThread();
        while (!isDone()) {
            try {
                wait();
            }
            catch (InterruptedException x) {
                throw new Error(x);
            }
        }
        if (error != null) {
            if (error instanceof Error) throw (Error)error;
            throw new Error("TCF task aborted", error);
        }
        return result;
    }

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws IOException if the computation was canceled or threw an exception
     */
    public synchronized V getIO() throws IOException {
        assert !Protocol.isDispatchThread();
        while (!isDone()) {
            try {
                wait();
            }
            catch (InterruptedException x) {
                throw new InterruptedIOException();
            }
        }
        if (error != null) {
            if (error instanceof IOException) throw (IOException)error;
            IOException y = new IOException("TCF task aborted");
            y.initCause(error);
            throw y;
        }
        return result;
    }

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was canceled
     * @throws ExecutionException if the computation threw an exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    public synchronized V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        assert !Protocol.isDispatchThread();
        if (!isDone()) {
            wait(unit.toMillis(timeout));
            if (!isDone()) throw new TimeoutException();
        }
        if (error != null) {
            if (error instanceof InterruptedException) throw (InterruptedException)error;
            if (error instanceof ExecutionException) throw (ExecutionException)error;
            if (error instanceof TimeoutException) throw (TimeoutException)error;
            throw new ExecutionException("TCF task aborted", error);
        }
        return result;
    }

    /**
     * Returns <tt>true</tt> if this task was canceled before it completed
     * normally.
     *
     * @return <tt>true</tt> if task was canceled before it completed
     */
    public synchronized boolean isCancelled() {
        return canceled;
    }

    /**
     * Returns <tt>true</tt> if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * <tt>true</tt>.
     *
     * @return <tt>true</tt> if this task completed.
     */
    public synchronized boolean isDone() {
        return error != null || done;
    }

    /**
     * Return task execution error if any.
     * @return Throwable object or null
     */
    protected Throwable getError() {
        return error;
    }

    /**
     * Return task execution result if any.
     * @return result object
     */
    protected V getResult() {
        return result;
    }
}

Back to the top