Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/util/task.py')
-rw-r--r--python/src/tcf/util/task.py204
1 files changed, 204 insertions, 0 deletions
diff --git a/python/src/tcf/util/task.py b/python/src/tcf/util/task.py
new file mode 100644
index 000000000..e4230c9dc
--- /dev/null
+++ b/python/src/tcf/util/task.py
@@ -0,0 +1,204 @@
+#******************************************************************************
+# Copyright (c) 2011 Wind River Systems, Inc. and others.
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Public License v1.0
+# which accompanies this distribution, and is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Contributors:
+# Wind River Systems - initial API and implementation
+#******************************************************************************
+
+import threading, exceptions
+from tcf import protocol
+
+class Task(object):
+ """
+ A <tt>Task</tt> is an utility class that represents the result of an asynchronous
+ communication over TCF framework. Methods are provided to check if the communication is
+ complete, to wait for its completion, and to retrieve the result of
+ the communication.
+
+ Task is useful when communication is requested by a thread other then TCF dispatch thread.
+ If client has a global state, for example, cached remote data, multithreading should be avoided,
+ because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment.
+ Such clients should consider message driven design, see DataCache and its usage as an example.
+
+ If a client is extending Task it should implement run() method to perform actual communications.
+ The run() method will be execute by TCF dispatch thread, and client code should then call either done() or
+ error() to indicate that task computations are complete.
+ """
+ __result = None
+ __is_done = False
+ __error = None
+ __canceled = False
+ __channel = None
+
+ def __init__(self, target=None, *args, **kwargs):
+ """
+ Construct a TCF task object and schedule it for execution.
+ """
+ if target:
+ kwargs["done"] = self.__done
+ else:
+ target = self.run
+
+ self._target = target
+ self._args = args
+ self._kwargs = kwargs
+ self._lock = threading.Condition()
+ protocol.invokeLater(self.__doRun)
+ timeout = kwargs.get("timeout")
+ if timeout:
+ protocol.invokeLaterWithDelay(timeout, self.cancel)
+
+# """
+# Construct a TCF task object and schedule it for execution.
+# The task will be __canceled if the given __channel is closed or
+# terminated while the task is in progress.
+# @param __channel
+# """
+# public Task(final IChannel __channel) {
+# Protocol.invokeLater(new Runnable() {
+# public void run() {
+# try {
+# if (__channel.getState() != IChannel.STATE_OPEN) throw new Exception("Channel is closed")
+# Task.this.__channel = __channel
+# channel_listener = new IChannel.IChannelListener() {
+#
+# public void congestionLevel(int level) {
+# }
+#
+# public void onChannelClosed(final Throwable __error) {
+# cancel(true)
+# }
+#
+# public void onChannelOpened() {
+# }
+# }
+# __channel.addChannelListener(channel_listener)
+# Task.this.run()
+# }
+# catch (Throwable x) {
+# if (!__is_done && __error == null) error(x)
+# }
+# }
+# })
+# }
+
+ def __doRun(self):
+ try:
+ self._target(*self._args, **self._kwargs)
+ except exceptions.Exception as x:
+ if not self.__is_done and self.__error is None:
+ self.error(x)
+
+ def __done(self, error, result):
+ if error:
+ self.error(error)
+ else:
+ self.done(result)
+
+ def run(self, *args, **kwargs):
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def done(self, result):
+ with self._lock:
+ assert protocol.isDispatchThread()
+ if self.__canceled: return
+ assert not self.__is_done
+ assert not self.__error
+ assert self.__result is None
+ self.__result = result
+ self.__is_done = True
+ if self.__channel:
+ self.__channel.removeChannelListener(self.channel_listener)
+ self._lock.notifyAll()
+
+ def error(self, error):
+ """
+ Set a __error and notify all threads waiting for the task to complete.
+ The method is supposed to be called in response to executing of run() method of this task.
+
+ @param __error - computation __error.
+ """
+ assert protocol.isDispatchThread()
+ assert error
+ with self._lock:
+ if self.__canceled: return
+ assert self.__error is None
+ assert self.__result is None
+ assert not self.__is_done
+ self.__error = error
+ if self.__channel:
+ self.__channel.removeChannelListener(self.channel_listener)
+ self._lock.notifyAll()
+
+ def cancel(self):
+ assert protocol.isDispatchThread()
+ with self._lock:
+ if self.isDone(): return False
+ self.__canceled = True
+ self.__error = exceptions.Exception("Canceled")
+ if self.__channel:
+ self.__channel.removeChannelListener(self.channel_listener)
+ self._lock.notifyAll()
+ return True
+
+ def get(self, timeout=None):
+ """
+ Waits if necessary for the computation to complete, and then
+ retrieves its result.
+
+ @return the computed result
+ @throws CancellationException if the computation was __canceled
+ @throws ExecutionException if the computation threw an
+ exception
+ @throws InterruptedException if the current thread was interrupted
+ while waiting
+ """
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while not self.isDone():
+ self._lock.wait(timeout)
+ if timeout and not self.isDone():
+ raise TimeoutException("Timed out")
+ if self.__error:
+ raise exceptions.Exception("TCF task aborted", self.__error)
+ return self.__result
+
+ def isCancelled(self):
+ """
+ Returns <tt>true</tt> if this task was __canceled before it completed
+ normally.
+
+ @return <tt>true</tt> if task was __canceled before it completed
+ """
+ with self._lock:
+ return self.__canceled
+
+ def isDone(self):
+ """
+ Returns <tt>true</tt> if this task completed.
+
+ Completion may be due to normal termination, an exception, or
+ cancellation -- in all of these cases, this method will return
+ <tt>true</tt>.
+
+ @return <tt>true</tt> if this task completed.
+ """
+ with self._lock:
+ return self.__error or self.__is_done
+
+ def getError(self):
+ """
+ Return task execution __error if any.
+ @return Throwable object or null
+ """
+ return self.__error
+
+ def getResult(self):
+ return self.__result
+
+class TimeoutException(exceptions.Exception):
+ pass

Back to the top