1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
#******************************************************************************
# Copyright (c) 2011 Wind River Systems, Inc. and others.
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
# Wind River Systems - initial API and implementation
#******************************************************************************
import threading, exceptions
from tcf import protocol
class Task(object):
"""
A <tt>Task</tt> is an utility class that represents the result of an asynchronous
communication over TCF framework. Methods are provided to check if the communication is
complete, to wait for its completion, and to retrieve the result of
the communication.
Task is useful when communication is requested by a thread other then TCF dispatch thread.
If client has a global state, for example, cached remote data, multithreading should be avoided,
because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment.
Such clients should consider message driven design, see DataCache and its usage as an example.
If a client is extending Task it should implement run() method to perform actual communications.
The run() method will be execute by TCF dispatch thread, and client code should then call either done() or
error() to indicate that task computations are complete.
"""
__result = None
__is_done = False
__error = None
__canceled = False
__channel = None
def __init__(self, target=None, *args, **kwargs):
"""
Construct a TCF task object and schedule it for execution.
"""
if target:
kwargs["done"] = self.__done
else:
target = self.run
self._target = target
self._args = args
self._kwargs = kwargs
self._lock = threading.Condition()
protocol.invokeLater(self.__doRun)
timeout = kwargs.get("timeout")
if timeout:
protocol.invokeLaterWithDelay(timeout, self.cancel)
# """
# Construct a TCF task object and schedule it for execution.
# The task will be __canceled if the given __channel is closed or
# terminated while the task is in progress.
# @param __channel
# """
# public Task(final IChannel __channel) {
# Protocol.invokeLater(new Runnable() {
# public void run() {
# try {
# if (__channel.getState() != IChannel.STATE_OPEN) throw new Exception("Channel is closed")
# Task.this.__channel = __channel
# channel_listener = new IChannel.IChannelListener() {
#
# public void congestionLevel(int level) {
# }
#
# public void onChannelClosed(final Throwable __error) {
# cancel(true)
# }
#
# public void onChannelOpened() {
# }
# }
# __channel.addChannelListener(channel_listener)
# Task.this.run()
# }
# catch (Throwable x) {
# if (!__is_done && __error == null) error(x)
# }
# }
# })
# }
def __doRun(self):
try:
self._target(*self._args, **self._kwargs)
except exceptions.Exception as x:
if not self.__is_done and self.__error is None:
self.error(x)
def __done(self, error, result):
if error:
self.error(error)
else:
self.done(result)
def run(self, *args, **kwargs):
raise exceptions.NotImplementedError("Abstract method")
def done(self, result):
with self._lock:
assert protocol.isDispatchThread()
if self.__canceled: return
assert not self.__is_done
assert not self.__error
assert self.__result is None
self.__result = result
self.__is_done = True
if self.__channel:
self.__channel.removeChannelListener(self.channel_listener)
self._lock.notifyAll()
def error(self, error):
"""
Set a __error and notify all threads waiting for the task to complete.
The method is supposed to be called in response to executing of run() method of this task.
@param __error - computation __error.
"""
assert protocol.isDispatchThread()
assert error
with self._lock:
if self.__canceled: return
assert self.__error is None
assert self.__result is None
assert not self.__is_done
self.__error = error
if self.__channel:
self.__channel.removeChannelListener(self.channel_listener)
self._lock.notifyAll()
def cancel(self):
assert protocol.isDispatchThread()
with self._lock:
if self.isDone(): return False
self.__canceled = True
self.__error = exceptions.Exception("Canceled")
if self.__channel:
self.__channel.removeChannelListener(self.channel_listener)
self._lock.notifyAll()
return True
def get(self, timeout=None):
"""
Waits if necessary for the computation to complete, and then
retrieves its result.
@return the computed result
@throws CancellationException if the computation was __canceled
@throws ExecutionException if the computation threw an
exception
@throws InterruptedException if the current thread was interrupted
while waiting
"""
assert not protocol.isDispatchThread()
with self._lock:
while not self.isDone():
self._lock.wait(timeout)
if timeout and not self.isDone():
raise TimeoutException("Timed out")
if self.__error:
raise exceptions.Exception("TCF task aborted", self.__error)
return self.__result
def isCancelled(self):
"""
Returns <tt>true</tt> if this task was __canceled before it completed
normally.
@return <tt>true</tt> if task was __canceled before it completed
"""
with self._lock:
return self.__canceled
def isDone(self):
"""
Returns <tt>true</tt> if this task completed.
Completion may be due to normal termination, an exception, or
cancellation -- in all of these cases, this method will return
<tt>true</tt>.
@return <tt>true</tt> if this task completed.
"""
with self._lock:
return self.__error or self.__is_done
def getError(self):
"""
Return task execution __error if any.
@return Throwable object or null
"""
return self.__error
def getResult(self):
return self.__result
class TimeoutException(exceptions.Exception):
pass
|