#****************************************************************************** # Copyright (c) 2011, 2013 Wind River Systems, Inc. and others. # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Public License v1.0 # which accompanies this distribution, and is available at # http://www.eclipse.org/legal/epl-v10.html # # Contributors: # Wind River Systems - initial API and implementation #****************************************************************************** import threading from .. import protocol, channel class Task(object): """A Task is an utility class that represents the result of an asynchronous communication over TCF framework. Methods are provided to check if the communication is complete, to wait for its completion, and to retrieve the result of the communication. Task is useful when communication is requested by a thread other then TCF dispatch thread. If client has a global state, for example, cached remote data, multithreading should be avoided, because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment. Such clients should consider message driven design, see DataCache and its usage as an example. If a client is extending Task it should implement run() method to perform actual communications. The run() method will be execute by TCF dispatch thread, and client code should then call either done() or error() to indicate that task computations are complete. """ __result = None __is_done = False __error = None __canceled = False __channel_listener = None def __init__(self, target=None, *args, **kwargs): """ Construct a TCF task object and schedule it for execution. """ if target: kwargs["done"] = self.__done else: target = self.run self._target = target self._args = args self._kwargs = kwargs self._lock = threading.Condition() self.__channel = kwargs.pop("channel", None) protocol.invokeLater(self.__doRun) timeout = kwargs.pop("timeout", None) if timeout: protocol.invokeLaterWithDelay(timeout, self.cancel) def __doRun(self): try: if self.__channel: if self.__channel.getState() != channel.STATE_OPEN: raise Exception("Channel is closed") task = self class CancelOnClose(channel.ChannelListener): def onChannelClosed(self, error): task.cancel(True) self.__channel_listener = CancelOnClose() self.__channel.addChannelListener(self.__channel_listener) self._target(*self._args, **self._kwargs) except Exception as x: if not self.__is_done and self.__error is None: self.error(x) def __done(self, error, result): if error: self.error(error) else: self.done(result) def run(self, *args, **kwargs): raise NotImplementedError("Abstract method") def done(self, result): with self._lock: assert protocol.isDispatchThread() if self.__canceled: return assert not self.__is_done assert not self.__error assert self.__result is None self.__result = result self.__is_done = True if self.__channel: self.__channel.removeChannelListener(self.__channel_listener) self._lock.notifyAll() def error(self, error): """ Set a error and notify all threads waiting for the task to complete. The method is supposed to be called in response to executing of run() method of this task. @param error - computation error. """ assert protocol.isDispatchThread() assert error with self._lock: if self.__canceled: return assert self.__error is None assert self.__result is None assert not self.__is_done self.__error = error if self.__channel: self.__channel.removeChannelListener(self.__channel_listener) self._lock.notifyAll() def cancel(self): assert protocol.isDispatchThread() with self._lock: if self.isDone(): return False self.__canceled = True self.__error = Exception("Canceled") if self.__channel: self.__channel.removeChannelListener(self.__channel_listener) self._lock.notifyAll() return True def get(self, timeout=None): """ Waits if necessary for the computation to complete, and then retrieves its result. @return the computed result @throws CancellationException if the computation was canceled @throws ExecutionException if the computation threw an exception @throws InterruptedException if the current thread was interrupted while waiting """ assert not protocol.isDispatchThread() with self._lock: while not self.isDone(): self._lock.wait(timeout) if timeout and not self.isDone(): raise TimeoutException("Timed out") if self.__error: raise Exception("TCF task aborted", self.__error) return self.__result def isCancelled(self): """ Returns true if this task was canceled before it completed normally. @return true if task was canceled before it completed """ with self._lock: return self.__canceled def isDone(self): """ Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true. @return true if this task completed. """ with self._lock: return self.__error or self.__is_done def getError(self): """ Return task execution error if any. @return Exception object or None """ return self.__error def getResult(self): return self.__result class TimeoutException(Exception): pass