Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/util')
-rw-r--r--python/src/tcf/util/__init__.py20
-rw-r--r--python/src/tcf/util/cache.py540
-rw-r--r--python/src/tcf/util/event.py172
-rw-r--r--python/src/tcf/util/logging.py108
-rw-r--r--python/src/tcf/util/sync.py444
-rw-r--r--python/src/tcf/util/task.py360
6 files changed, 822 insertions, 822 deletions
diff --git a/python/src/tcf/util/__init__.py b/python/src/tcf/util/__init__.py
index cf36110a7..29aa42bc1 100644
--- a/python/src/tcf/util/__init__.py
+++ b/python/src/tcf/util/__init__.py
@@ -1,10 +1,10 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
+# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
diff --git a/python/src/tcf/util/cache.py b/python/src/tcf/util/cache.py
index aaa8622c3..7e33c535e 100644
--- a/python/src/tcf/util/cache.py
+++ b/python/src/tcf/util/cache.py
@@ -1,270 +1,270 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
-
-import cStringIO
-from tcf import protocol, channel
-
-class DataCache(object):
- """
- Objects of this class are used to cache TCF remote data.
- The cache is asynchronous state machine. The states are:
- 1. Valid - cache is in sync with remote data, use getError() and getData() to get cached data
- 2. Invalid - cache is out of sync, start data retrieval by calling validate()
- 3. Pending - cache is waiting result of a command that was sent to remote peer
- 4. Disposed - cache is disposed and cannot be used to store data.
-
- A cache instance can be created on any data type that needs to be cached.
- Examples might be context children list, context properties, context state, memory data,
- register data, symbol, variable, etc.
- Clients of cache items can register for cache changes, but don't need to think about any particular events
- since that is handled by the cache item itself.
-
- A typical cache client should implement Runnable interface.
- The implementation of run() method should:
-
- Validate all cache items required for client task.
- If anything is invalid then client should not alter any shared data structures,
- should discard any intermediate results and register (wait) for changes of invalid cache instance(s) state.
- When cache item state changes, client is invoked again and full validation is restarted.
- Once everything is valid, client completes its task in a single dispatch cycle.
-
- Note: clients should never retain copies of remote data across dispatch cycles!
- Such data would get out of sync and compromise data consistency.
- All remote data and everything derived from remote data should be kept in cache items
- that implement proper event handling and can keep data consistent across dispatch cycles.
- """
-
- __error = None
- __valid = None
- __posted = False
- __disposed = False
- __data = None
-
- def __init__(self, channel):
- assert channel
- self._channel = channel
- self._command = None
- self.__waiting_list = None
-
- def post(self):
- if self.__posted: return
- if not self.__waiting_list: return
- protocol.invokeLater(self)
- self.__posted = True
-
- def isValid(self):
- """
- @return True if cache contains up-to-date data or error.
- """
- return self.__valid
-
- def isPending(self):
- """
- @return True if data retrieval command is in progress.
- """
- return self._command is not None
-
- def isDisposed(self):
- """
- @return True if cache is disposed.
- """
- return self.__disposed
-
- def getError(self):
- """
- @return error object if data retrieval ended with an error, or None if retrieval was successful.
- Note: It is prohibited to call this method when cache is not valid.
- """
- assert self.__valid
- return self.__error
-
- def getData(self):
- """
- @return cached data object.
- Note: It is prohibited to call this method when cache is not valid.
- """
- assert protocol.isDispatchThread()
- assert self.__valid
- return self.__data
-
- def __call__(self):
- """
- Notify waiting clients about cache state change and remove them from wait list.
- It is responsibility of clients to check if the state change was one they are waiting for.
- Clients are not intended to call this method.
- """
- assert protocol.isDispatchThread()
- self.__posted = False
- if self.__waiting_list:
- arr = self.__waiting_list
- self.__waiting_list = None
- for r in arr:
- if isinstance(r, DataCache) and r._DataCache__posted: continue
- r()
- if self.__waiting_list is None: self.__waiting_list = arr
-
- def wait(self, cb):
- """
- Add a client call-back to cache wait list.
- Client call-backs are activated when cache state changes.
- Call-backs are removed from waiting list after that.
- It is responsibility of clients to check if the state change was one they are waiting for.
- @param cb - a call-back object
- """
- assert protocol.isDispatchThread()
- assert not self.__disposed
- assert not self.__valid
- if cb and not self.isWaiting(cb):
- if self.__waiting_list is None: self.__waiting_list = []
- self.__waiting_list.append(cb)
-
- def isWaiting(self, cb):
- """
- Return True if a client call-back is waiting for state changes of this cache item.
- @param cb - a call-back object.
- @return True if 'cb' is in the wait list.
- """
- if not self.__waiting_list: return False
- for r in self.__waiting_list:
- if r is cb: return True
- return False
-
- def __validate(self):
- """
- Initiate data retrieval if the cache is not valid.
- @return True if the cache is already valid
- """
- assert protocol.isDispatchThread()
- if self.__disposed or self._channel.getState() != channel.STATE_OPEN:
- self._command = None
- self.__valid = True
- self.__error = None
- self.__data = None
- else:
- if self._command is not None: return False
- if not self.__valid and not self.startDataRetrieval(): return False
- assert self.__valid
- assert self._command is None
- self.post()
- return True
-
- def validate(self, cb=None):
- """
- If the cache is not valid, initiate data retrieval and
- add a client call-back to cache wait list.
- Client call-backs are activated when cache state changes.
- Call-backs are removed from waiting list after that.
- It is responsibility of clients to check if the state change was one they are waiting for.
- If the cache is valid do nothing and return True.
- @param cb - a call-back object (optional)
- @return True if the cache is already valid
- """
- if not self.__validate():
- if cb: self.wait(cb)
- return False
- return True
-
- def start(self, command):
- """
- Start cache pending state.
- Pending state is when the cache is waiting for a TCF command to return results.
- @param command - TCF command handle.
- """
- assert not self.__valid
- assert command
- assert self._command is None
- self._command = command
-
- def done(self, command):
- """
- End cache pending state, but not mark the cache as valid.
- @param command - TCF command handle.
- """
- if self._command is not command: return
- assert not self.__valid
- self._command = None
- self.post()
-
- def set(self, token, error, data):
- """
- End cache pending state and mark the cache as valid.
- If 'token' != None, the data represent results from a completed command.
- The data should be ignored if current cache pending command is not same as 'token'.
- It can happen if the cache was reset or canceled during data retrieval.
- @param token - pending command handle or None.
- @param error - data retrieval error or None
- @param data - up-to-date data object
- """
- assert protocol.isDispatchThread()
- if token and self._command is not token: return
- self._command = None
- if not self.__disposed:
- assert not self.__valid
- if self._channel.getState() != channel.STATE_OPEN:
- self.__error = None
- self.__data = None
- self.__error = error
- self.__data = data
- self.__valid = True
- self.post()
-
- def reset(self, data=None):
- """
- Force cache to become valid, cancel pending data retrieval if data is provided.
- @param data - up-to-date data object (optional)
- """
- assert protocol.isDispatchThread()
- if data is not None and self._command is not None:
- self._command.cancel()
- self._command = None
- if not self.__disposed:
- self.__data = data
- self.__error = None
- self.__valid = True
- self.post()
-
- def cancel(self):
- """
- Invalidate the cache.
- Cancel pending data retrieval if any.
- """
- self.reset()
- if self._command is not None:
- self._command.cancel()
- self._command = None
-
- def dispose(self):
- """
- Dispose the cache.
- Cancel pending data retrieval if any.
- """
- self.cancel()
- self.__valid = True
- self.__disposed = True
-
- def __str__(self):
- bf = cStringIO.StringIO()
- bf.write('[')
- if self.__valid: bf.append("valid,")
- if self.__disposed: bf.write("disposed,")
- if self.__posted: bf.write("posted,")
- if self.__error is not None: bf.write("error,")
- bf.write("data=")
- bf.write(str(self.__data))
- bf.write(']')
- return bf.getvalue()
-
- def startDataRetrieval(self):
- """
- Sub-classes should override this method to implement actual data retrieval logic.
- @return True is all done, False if retrieval is in progress.
- """
- raise NotImplementedError("Abstract method")
+# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import cStringIO
+from tcf import protocol, channel
+
+class DataCache(object):
+ """
+ Objects of this class are used to cache TCF remote data.
+ The cache is asynchronous state machine. The states are:
+ 1. Valid - cache is in sync with remote data, use getError() and getData() to get cached data
+ 2. Invalid - cache is out of sync, start data retrieval by calling validate()
+ 3. Pending - cache is waiting result of a command that was sent to remote peer
+ 4. Disposed - cache is disposed and cannot be used to store data.
+
+ A cache instance can be created on any data type that needs to be cached.
+ Examples might be context children list, context properties, context state, memory data,
+ register data, symbol, variable, etc.
+ Clients of cache items can register for cache changes, but don't need to think about any particular events
+ since that is handled by the cache item itself.
+
+ A typical cache client should implement Runnable interface.
+ The implementation of run() method should:
+
+ Validate all cache items required for client task.
+ If anything is invalid then client should not alter any shared data structures,
+ should discard any intermediate results and register (wait) for changes of invalid cache instance(s) state.
+ When cache item state changes, client is invoked again and full validation is restarted.
+ Once everything is valid, client completes its task in a single dispatch cycle.
+
+ Note: clients should never retain copies of remote data across dispatch cycles!
+ Such data would get out of sync and compromise data consistency.
+ All remote data and everything derived from remote data should be kept in cache items
+ that implement proper event handling and can keep data consistent across dispatch cycles.
+ """
+
+ __error = None
+ __valid = None
+ __posted = False
+ __disposed = False
+ __data = None
+
+ def __init__(self, channel):
+ assert channel
+ self._channel = channel
+ self._command = None
+ self.__waiting_list = None
+
+ def post(self):
+ if self.__posted: return
+ if not self.__waiting_list: return
+ protocol.invokeLater(self)
+ self.__posted = True
+
+ def isValid(self):
+ """
+ @return True if cache contains up-to-date data or error.
+ """
+ return self.__valid
+
+ def isPending(self):
+ """
+ @return True if data retrieval command is in progress.
+ """
+ return self._command is not None
+
+ def isDisposed(self):
+ """
+ @return True if cache is disposed.
+ """
+ return self.__disposed
+
+ def getError(self):
+ """
+ @return error object if data retrieval ended with an error, or None if retrieval was successful.
+ Note: It is prohibited to call this method when cache is not valid.
+ """
+ assert self.__valid
+ return self.__error
+
+ def getData(self):
+ """
+ @return cached data object.
+ Note: It is prohibited to call this method when cache is not valid.
+ """
+ assert protocol.isDispatchThread()
+ assert self.__valid
+ return self.__data
+
+ def __call__(self):
+ """
+ Notify waiting clients about cache state change and remove them from wait list.
+ It is responsibility of clients to check if the state change was one they are waiting for.
+ Clients are not intended to call this method.
+ """
+ assert protocol.isDispatchThread()
+ self.__posted = False
+ if self.__waiting_list:
+ arr = self.__waiting_list
+ self.__waiting_list = None
+ for r in arr:
+ if isinstance(r, DataCache) and r._DataCache__posted: continue
+ r()
+ if self.__waiting_list is None: self.__waiting_list = arr
+
+ def wait(self, cb):
+ """
+ Add a client call-back to cache wait list.
+ Client call-backs are activated when cache state changes.
+ Call-backs are removed from waiting list after that.
+ It is responsibility of clients to check if the state change was one they are waiting for.
+ @param cb - a call-back object
+ """
+ assert protocol.isDispatchThread()
+ assert not self.__disposed
+ assert not self.__valid
+ if cb and not self.isWaiting(cb):
+ if self.__waiting_list is None: self.__waiting_list = []
+ self.__waiting_list.append(cb)
+
+ def isWaiting(self, cb):
+ """
+ Return True if a client call-back is waiting for state changes of this cache item.
+ @param cb - a call-back object.
+ @return True if 'cb' is in the wait list.
+ """
+ if not self.__waiting_list: return False
+ for r in self.__waiting_list:
+ if r is cb: return True
+ return False
+
+ def __validate(self):
+ """
+ Initiate data retrieval if the cache is not valid.
+ @return True if the cache is already valid
+ """
+ assert protocol.isDispatchThread()
+ if self.__disposed or self._channel.getState() != channel.STATE_OPEN:
+ self._command = None
+ self.__valid = True
+ self.__error = None
+ self.__data = None
+ else:
+ if self._command is not None: return False
+ if not self.__valid and not self.startDataRetrieval(): return False
+ assert self.__valid
+ assert self._command is None
+ self.post()
+ return True
+
+ def validate(self, cb=None):
+ """
+ If the cache is not valid, initiate data retrieval and
+ add a client call-back to cache wait list.
+ Client call-backs are activated when cache state changes.
+ Call-backs are removed from waiting list after that.
+ It is responsibility of clients to check if the state change was one they are waiting for.
+ If the cache is valid do nothing and return True.
+ @param cb - a call-back object (optional)
+ @return True if the cache is already valid
+ """
+ if not self.__validate():
+ if cb: self.wait(cb)
+ return False
+ return True
+
+ def start(self, command):
+ """
+ Start cache pending state.
+ Pending state is when the cache is waiting for a TCF command to return results.
+ @param command - TCF command handle.
+ """
+ assert not self.__valid
+ assert command
+ assert self._command is None
+ self._command = command
+
+ def done(self, command):
+ """
+ End cache pending state, but not mark the cache as valid.
+ @param command - TCF command handle.
+ """
+ if self._command is not command: return
+ assert not self.__valid
+ self._command = None
+ self.post()
+
+ def set(self, token, error, data):
+ """
+ End cache pending state and mark the cache as valid.
+ If 'token' != None, the data represent results from a completed command.
+ The data should be ignored if current cache pending command is not same as 'token'.
+ It can happen if the cache was reset or canceled during data retrieval.
+ @param token - pending command handle or None.
+ @param error - data retrieval error or None
+ @param data - up-to-date data object
+ """
+ assert protocol.isDispatchThread()
+ if token and self._command is not token: return
+ self._command = None
+ if not self.__disposed:
+ assert not self.__valid
+ if self._channel.getState() != channel.STATE_OPEN:
+ self.__error = None
+ self.__data = None
+ self.__error = error
+ self.__data = data
+ self.__valid = True
+ self.post()
+
+ def reset(self, data=None):
+ """
+ Force cache to become valid, cancel pending data retrieval if data is provided.
+ @param data - up-to-date data object (optional)
+ """
+ assert protocol.isDispatchThread()
+ if data is not None and self._command is not None:
+ self._command.cancel()
+ self._command = None
+ if not self.__disposed:
+ self.__data = data
+ self.__error = None
+ self.__valid = True
+ self.post()
+
+ def cancel(self):
+ """
+ Invalidate the cache.
+ Cancel pending data retrieval if any.
+ """
+ self.reset()
+ if self._command is not None:
+ self._command.cancel()
+ self._command = None
+
+ def dispose(self):
+ """
+ Dispose the cache.
+ Cancel pending data retrieval if any.
+ """
+ self.cancel()
+ self.__valid = True
+ self.__disposed = True
+
+ def __str__(self):
+ bf = cStringIO.StringIO()
+ bf.write('[')
+ if self.__valid: bf.append("valid,")
+ if self.__disposed: bf.write("disposed,")
+ if self.__posted: bf.write("posted,")
+ if self.__error is not None: bf.write("error,")
+ bf.write("data=")
+ bf.write(str(self.__data))
+ bf.write(']')
+ return bf.getvalue()
+
+ def startDataRetrieval(self):
+ """
+ Sub-classes should override this method to implement actual data retrieval logic.
+ @return True is all done, False if retrieval is in progress.
+ """
+ raise NotImplementedError("Abstract method")
diff --git a/python/src/tcf/util/event.py b/python/src/tcf/util/event.py
index 3e5e92118..2ca38df63 100644
--- a/python/src/tcf/util/event.py
+++ b/python/src/tcf/util/event.py
@@ -1,86 +1,86 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
-
-import threading
-from tcf import protocol, channel
-
-class DelegatingEventListener(channel.EventListener):
- def __init__(self, callable):
- self._callable = callable
- def event(self, name, data):
- try:
- args = channel.fromJSONSequence(data)
- self._callable(self.svc_name, name, *args)
- except Exception as x:
- protocol.log("Error decoding event data", x)
-
-def _print_event(service, name, *args):
- print "Event: %s.%s%s" % (service, name, tuple(args))
-
-def get_event_printer():
- return DelegatingEventListener(_print_event)
-
-class EventRecorder(object):
- def __init__(self, channel):
- self._channel = channel
- self._events = []
- self._listeners = {}
- self._lock = threading.RLock()
- self._filter = None
- def __del__(self):
- if self._channel.state == channel.STATE_OPEN:
- self.stop()
- def record(self, service, enable=True):
- with self._lock:
- listener = self._listeners.get(service)
- if listener:
- if not enable:
- protocol.invokeLater(self._channel.removeEventListener, service, listener)
- elif enable:
- recorder = self
- class Listener(channel.EventListener):
- def event(self, name, data):
- e = Event(service, name, data)
- recorder._event(e)
- listener = Listener()
- self._listeners[service] = listener
- protocol.invokeLater(self._channel.addEventListener, service, listener)
- self._recording = enable
- def stop(self, service=None):
- if service:
- self.record(service, False)
- else:
- for service in self._listeners.keys():
- self.record(service, False)
- def get(self):
- with self._lock:
- events = self._events
- self._events = []
- return events
- def _event(self, e):
- with self._lock:
- self._events.append(e)
- def __str__(self):
- events = self.get()
- return "\n".join(map(str, events))
- __repr__ = __str__
-
-class Event(object):
- def __init__(self, service, name, data):
- self.service = service
- self.name = name
- try:
- self.args = channel.fromJSONSequence(data)
- except Exception as x:
- protocol.log("Error decoding event data", x)
- def __str__(self):
- return "Event: %s.%s%s" % (self.service, self.name, tuple(self.args))
- __repr__ = __str__
+# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import threading
+from tcf import protocol, channel
+
+class DelegatingEventListener(channel.EventListener):
+ def __init__(self, callable):
+ self._callable = callable
+ def event(self, name, data):
+ try:
+ args = channel.fromJSONSequence(data)
+ self._callable(self.svc_name, name, *args)
+ except Exception as x:
+ protocol.log("Error decoding event data", x)
+
+def _print_event(service, name, *args):
+ print "Event: %s.%s%s" % (service, name, tuple(args))
+
+def get_event_printer():
+ return DelegatingEventListener(_print_event)
+
+class EventRecorder(object):
+ def __init__(self, channel):
+ self._channel = channel
+ self._events = []
+ self._listeners = {}
+ self._lock = threading.RLock()
+ self._filter = None
+ def __del__(self):
+ if self._channel.state == channel.STATE_OPEN:
+ self.stop()
+ def record(self, service, enable=True):
+ with self._lock:
+ listener = self._listeners.get(service)
+ if listener:
+ if not enable:
+ protocol.invokeLater(self._channel.removeEventListener, service, listener)
+ elif enable:
+ recorder = self
+ class Listener(channel.EventListener):
+ def event(self, name, data):
+ e = Event(service, name, data)
+ recorder._event(e)
+ listener = Listener()
+ self._listeners[service] = listener
+ protocol.invokeLater(self._channel.addEventListener, service, listener)
+ self._recording = enable
+ def stop(self, service=None):
+ if service:
+ self.record(service, False)
+ else:
+ for service in self._listeners.keys():
+ self.record(service, False)
+ def get(self):
+ with self._lock:
+ events = self._events
+ self._events = []
+ return events
+ def _event(self, e):
+ with self._lock:
+ self._events.append(e)
+ def __str__(self):
+ events = self.get()
+ return "\n".join(map(str, events))
+ __repr__ = __str__
+
+class Event(object):
+ def __init__(self, service, name, data):
+ self.service = service
+ self.name = name
+ try:
+ self.args = channel.fromJSONSequence(data)
+ except Exception as x:
+ protocol.log("Error decoding event data", x)
+ def __str__(self):
+ return "Event: %s.%s%s" % (self.service, self.name, tuple(self.args))
+ __repr__ = __str__
diff --git a/python/src/tcf/util/logging.py b/python/src/tcf/util/logging.py
index afe3037fc..0a15b5cde 100644
--- a/python/src/tcf/util/logging.py
+++ b/python/src/tcf/util/logging.py
@@ -1,54 +1,54 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
-
-"Internal utility methods used for logging/tracing."
-
-from tcf import protocol
-import locale, time, cStringIO
-
-DECIMAL_DELIMITER = locale.localeconv().get('decimal_point', '.')
-
-def getDebugTime():
- """
- Returns a relative timestamp in the form "seconds,milliseconds". Each
- segment is zero-padded to three digits, ensuring a consistent length of
- seven characters. The timestamp has no absolute meaning. It is merely the
- elapsed time since January 1, 1970 UT truncated at 999 seconds. Do not
- use this for production code, especially for mathematically determining
- the relative time between two events, since the counter will flip to zero
- roughly every 16 minutes.
- """
- traceBuilder = cStringIO.StringIO()
-
- # Record the time
- tm = int(time.time() * 1000)
- seconds = (tm / 1000) % 1000
- if seconds < 100: traceBuilder.write('0')
- if seconds < 10: traceBuilder.write('0')
- traceBuilder.write(str(seconds))
- traceBuilder.write(DECIMAL_DELIMITER)
- millis = tm % 1000
- if millis < 100: traceBuilder.write('0')
- if millis < 10: traceBuilder.write('0')
- traceBuilder.write(str(millis))
- return traceBuilder.getvalue()
-
-def trace(msg):
- """
- Trace hooks should use this method to log a message. It prepends the
- message with a timestamp and sends it to the TCF logger facility. The
- logger implementation may or may not inject its own timestamp. For
- tracing, we definitely need one, so we introduce a minimal, relative-time
- stamp.
-
- @param msg the trace message
- """
- protocol.log('%s %s' % (getDebugTime(), msg))
+# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+"Internal utility methods used for logging/tracing."
+
+from tcf import protocol
+import locale, time, cStringIO
+
+DECIMAL_DELIMITER = locale.localeconv().get('decimal_point', '.')
+
+def getDebugTime():
+ """
+ Returns a relative timestamp in the form "seconds,milliseconds". Each
+ segment is zero-padded to three digits, ensuring a consistent length of
+ seven characters. The timestamp has no absolute meaning. It is merely the
+ elapsed time since January 1, 1970 UT truncated at 999 seconds. Do not
+ use this for production code, especially for mathematically determining
+ the relative time between two events, since the counter will flip to zero
+ roughly every 16 minutes.
+ """
+ traceBuilder = cStringIO.StringIO()
+
+ # Record the time
+ tm = int(time.time() * 1000)
+ seconds = (tm / 1000) % 1000
+ if seconds < 100: traceBuilder.write('0')
+ if seconds < 10: traceBuilder.write('0')
+ traceBuilder.write(str(seconds))
+ traceBuilder.write(DECIMAL_DELIMITER)
+ millis = tm % 1000
+ if millis < 100: traceBuilder.write('0')
+ if millis < 10: traceBuilder.write('0')
+ traceBuilder.write(str(millis))
+ return traceBuilder.getvalue()
+
+def trace(msg):
+ """
+ Trace hooks should use this method to log a message. It prepends the
+ message with a timestamp and sends it to the TCF logger facility. The
+ logger implementation may or may not inject its own timestamp. For
+ tracing, we definitely need one, so we introduce a minimal, relative-time
+ stamp.
+
+ @param msg the trace message
+ """
+ protocol.log('%s %s' % (getDebugTime(), msg))
diff --git a/python/src/tcf/util/sync.py b/python/src/tcf/util/sync.py
index d85434876..400c96526 100644
--- a/python/src/tcf/util/sync.py
+++ b/python/src/tcf/util/sync.py
@@ -1,222 +1,222 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
-
-import threading, types
-from tcf import protocol
-from tcf.channel.Command import Command
-
-class DispatchWrapper(object):
- "Simple wrapper for attribute access and invocation on TCF dispatch thread"
- def __init__(self, inner):
- self.inner = inner
- def __getattr__(self, attr):
- val = protocol.invokeAndWait(getattr, self.inner, attr)
- if type(val) in (types.FunctionType, types.MethodType):
- return DispatchWrapper(val)
- return val
- def __call__(self, *args, **kwargs):
- return protocol.invokeAndWait(self.inner, *args, **kwargs)
-
-class CommandControl(object):
- """Provides a simple interface to send commands to remote services
- and receive results.
-
- Usage:
- > cmd = CommandControl(channel)
- > cmd.<service>.<command>(<args>)
-
- Examples:
- # send command, but don't wait for result:
- > cmd.RunControl.suspend("system")
- # getE() returns the result and raises an exception in case of error response:
- > result = cmd.RunControl.getChildren(None).getE()
- # to get error and result at the same time, use this form:
- > error, result = cmd.Breakpoints.getIDs()
- """
- def __init__(self, channel, onDone=None, interactive=False):
- self._lock = threading.Condition()
- self._channel = channel
- self._onDone = onDone
- self._interactive = interactive
- self._queue = []
- self._pending = {}
- self._complete = []
- def __getattr__(self, attr):
- val = getattr(self._channel, attr, None)
- if val:
- if self._interactive and type(val) in (types.FunctionType, types.MethodType):
- val = DispatchWrapper(val)
- return val
- services = protocol.invokeAndWait(self._channel.getRemoteServices)
- if attr == "services":
- return services
- if attr in services:
- return ServiceWrapper(self, attr)
- raise AttributeError("Unknown service: %s. Use one of %s" % (attr, services))
- def invoke(self, service, command, *args, **kwargs):
- cmd = None
- if not protocol.isDispatchThread():
- if not kwargs.get("async"):
- cmd = protocol.invokeAndWait(self._invoke, service, command, *args, **kwargs)
- if cmd and self._interactive:
- return cmd.getE()
- else:
- with self._lock:
- self._queue.append((service, command, args, kwargs))
- if len(self._queue) == 1:
- protocol.invokeLater(self._processQueue)
- return
- return cmd
- def _invoke(self, service, command, *args, **kwargs):
- cmdCtrl = self
- class GenericCommand(Command):
- _result = None
- def done(self, error, args):
- resultArgs = None
- if not error and args:
- # error result is usually in args[0], but there are exceptions
- if service == "StackTrace" and command == "getContext":
- error = self.toError(args[1])
- resultArgs = (args[0],)
- elif service == "Expressions" and command == "evaluate":
- error = self.toError(args[1])
- resultArgs = (args[0], args[2])
- elif service == "FileSystem" and command in ('read', 'readdir', 'roots'):
- error = self.toError(args[1])
- resultArgs = (args[0],) + tuple(args[2:])
- elif service == "Diagnostics" and command.startswith("echo"):
- resultArgs = (args[0],)
- else:
- error = self.toError(args[0])
- resultArgs = args[1:]
- cmdCtrl._doneCommand(self.token, error, resultArgs)
- def wait(self, timeout=None):
- cmdCtrl._waitForCommand(self.token, timeout)
- def cancel(self):
- return protocol.invokeAndWait(self.token.cancel)
- def getResult(self, wait=None):
- if wait:
- cmdCtrl._waitForCommand(self.token)
- return self._result
- def getE(self):
- r = self.getResult(True)
- if r.error:
- raise r.error
- return r.args
- def get(self):
- r = self.getResult(True)
- return r.args
- def getError(self):
- r = self.getResult(True)
- return r.error
- def __str__(self):
- if self._async:
- return self.getCommandString()
- return str(self.get())
- def __iter__(self):
- return iter(self.getResult(True))
- cmd = GenericCommand(self._channel, service, command, args)
- cmd._async = kwargs.get("async")
- cmd._onDone = kwargs.get("onDone")
- self._addPending(cmd)
- return cmd
- def _processQueue(self):
- assert protocol.isDispatchThread()
- with self._lock:
- for cmd in self._queue:
- service, command, args, kwargs = cmd
- self._invoke(service, command, *args, **kwargs)
- del self._queue[:]
- def _addPending(self, cmd):
- with self._lock:
- self._pending[cmd.token.id] = cmd
- self._lock.notifyAll()
- def _doneCommand(self, token, error, args):
- with self._lock:
- cmd = self._pending.get(token.id)
- assert cmd
- del self._pending[token.id]
- cmd._result = CommandResult(token, error, args)
- if cmd._async: self._complete.append(cmd)
- isDone = self.isDone()
- if isDone: self._lock.notifyAll()
- if cmd._onDone:
- if args is None: args = (None,)
- cmd._onDone(error, *args)
- if isDone and self._onDone: self._onDone()
- def isDone(self):
- with self._lock:
- return not self._pending and not self._queue
- def wait(self, timeout=None):
- assert not protocol.isDispatchThread()
- with self._lock:
- while self._pending or self._queue:
- self._lock.wait(timeout)
- if timeout: break
- def _waitForCommand(self, token, timeout=None):
- assert not protocol.isDispatchThread()
- with self._lock:
- while token.id in self._pending:
- self._lock.wait(timeout)
- if timeout: break
- else:
- if self._queue:
- self._lock.wait(timeout)
- while token.id in self._pending:
- self._lock.wait(timeout)
- if timeout: break
- def cancel(self):
- if not protocol.isDispatchThread():
- protocol.invokeLater(self.cancel)
- return
- with self._lock:
- for cmd in self._pending.values():
- cmd.token.cancel()
- del self._queue[:]
- def getResult(self, wait=True):
- if wait:
- self.wait()
- with self._lock:
- result = map(lambda c: c.getResult(), self._complete)
- del self._complete[:]
- return result
-
-class CommandResult(object):
- def __init__(self, token, error, args):
- self.token = token
- self.error = error
- # unwrap result if only one element
- if args and len(args) == 1:
- args = args[0]
- self.args = args
- def __str__(self):
- if self.error:
- return "[%s] error: %s" % (self.token.id, self.error)
- return "[%s] result: %s" % (self.token.id, self.args)
- __repr__ = __str__
- def __iter__(self):
- yield self.error
- yield self.args
-
-class ServiceWrapper(object):
- def __init__(self, control, service):
- self._control = control
- self._service = service
- def __getattr__(self, attr):
- return CommandWrapper(self._control, self._service, attr)
-
-class CommandWrapper(object):
- def __init__(self, control, service, command):
- self._control = control
- self._service = service
- self._command = command
- def __call__(self, *args, **kwargs):
- return self._control.invoke(self._service, self._command, *args, **kwargs)
+# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import threading, types
+from tcf import protocol
+from tcf.channel.Command import Command
+
+class DispatchWrapper(object):
+ "Simple wrapper for attribute access and invocation on TCF dispatch thread"
+ def __init__(self, inner):
+ self.inner = inner
+ def __getattr__(self, attr):
+ val = protocol.invokeAndWait(getattr, self.inner, attr)
+ if type(val) in (types.FunctionType, types.MethodType):
+ return DispatchWrapper(val)
+ return val
+ def __call__(self, *args, **kwargs):
+ return protocol.invokeAndWait(self.inner, *args, **kwargs)
+
+class CommandControl(object):
+ """Provides a simple interface to send commands to remote services
+ and receive results.
+
+ Usage:
+ > cmd = CommandControl(channel)
+ > cmd.<service>.<command>(<args>)
+
+ Examples:
+ # send command, but don't wait for result:
+ > cmd.RunControl.suspend("system")
+ # getE() returns the result and raises an exception in case of error response:
+ > result = cmd.RunControl.getChildren(None).getE()
+ # to get error and result at the same time, use this form:
+ > error, result = cmd.Breakpoints.getIDs()
+ """
+ def __init__(self, channel, onDone=None, interactive=False):
+ self._lock = threading.Condition()
+ self._channel = channel
+ self._onDone = onDone
+ self._interactive = interactive
+ self._queue = []
+ self._pending = {}
+ self._complete = []
+ def __getattr__(self, attr):
+ val = getattr(self._channel, attr, None)
+ if val:
+ if self._interactive and type(val) in (types.FunctionType, types.MethodType):
+ val = DispatchWrapper(val)
+ return val
+ services = protocol.invokeAndWait(self._channel.getRemoteServices)
+ if attr == "services":
+ return services
+ if attr in services:
+ return ServiceWrapper(self, attr)
+ raise AttributeError("Unknown service: %s. Use one of %s" % (attr, services))
+ def invoke(self, service, command, *args, **kwargs):
+ cmd = None
+ if not protocol.isDispatchThread():
+ if not kwargs.get("async"):
+ cmd = protocol.invokeAndWait(self._invoke, service, command, *args, **kwargs)
+ if cmd and self._interactive:
+ return cmd.getE()
+ else:
+ with self._lock:
+ self._queue.append((service, command, args, kwargs))
+ if len(self._queue) == 1:
+ protocol.invokeLater(self._processQueue)
+ return
+ return cmd
+ def _invoke(self, service, command, *args, **kwargs):
+ cmdCtrl = self
+ class GenericCommand(Command):
+ _result = None
+ def done(self, error, args):
+ resultArgs = None
+ if not error and args:
+ # error result is usually in args[0], but there are exceptions
+ if service == "StackTrace" and command == "getContext":
+ error = self.toError(args[1])
+ resultArgs = (args[0],)
+ elif service == "Expressions" and command == "evaluate":
+ error = self.toError(args[1])
+ resultArgs = (args[0], args[2])
+ elif service == "FileSystem" and command in ('read', 'readdir', 'roots'):
+ error = self.toError(args[1])
+ resultArgs = (args[0],) + tuple(args[2:])
+ elif service == "Diagnostics" and command.startswith("echo"):
+ resultArgs = (args[0],)
+ else:
+ error = self.toError(args[0])
+ resultArgs = args[1:]
+ cmdCtrl._doneCommand(self.token, error, resultArgs)
+ def wait(self, timeout=None):
+ cmdCtrl._waitForCommand(self.token, timeout)
+ def cancel(self):
+ return protocol.invokeAndWait(self.token.cancel)
+ def getResult(self, wait=None):
+ if wait:
+ cmdCtrl._waitForCommand(self.token)
+ return self._result
+ def getE(self):
+ r = self.getResult(True)
+ if r.error:
+ raise r.error
+ return r.args
+ def get(self):
+ r = self.getResult(True)
+ return r.args
+ def getError(self):
+ r = self.getResult(True)
+ return r.error
+ def __str__(self):
+ if self._async:
+ return self.getCommandString()
+ return str(self.get())
+ def __iter__(self):
+ return iter(self.getResult(True))
+ cmd = GenericCommand(self._channel, service, command, args)
+ cmd._async = kwargs.get("async")
+ cmd._onDone = kwargs.get("onDone")
+ self._addPending(cmd)
+ return cmd
+ def _processQueue(self):
+ assert protocol.isDispatchThread()
+ with self._lock:
+ for cmd in self._queue:
+ service, command, args, kwargs = cmd
+ self._invoke(service, command, *args, **kwargs)
+ del self._queue[:]
+ def _addPending(self, cmd):
+ with self._lock:
+ self._pending[cmd.token.id] = cmd
+ self._lock.notifyAll()
+ def _doneCommand(self, token, error, args):
+ with self._lock:
+ cmd = self._pending.get(token.id)
+ assert cmd
+ del self._pending[token.id]
+ cmd._result = CommandResult(token, error, args)
+ if cmd._async: self._complete.append(cmd)
+ isDone = self.isDone()
+ if isDone: self._lock.notifyAll()
+ if cmd._onDone:
+ if args is None: args = (None,)
+ cmd._onDone(error, *args)
+ if isDone and self._onDone: self._onDone()
+ def isDone(self):
+ with self._lock:
+ return not self._pending and not self._queue
+ def wait(self, timeout=None):
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while self._pending or self._queue:
+ self._lock.wait(timeout)
+ if timeout: break
+ def _waitForCommand(self, token, timeout=None):
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while token.id in self._pending:
+ self._lock.wait(timeout)
+ if timeout: break
+ else:
+ if self._queue:
+ self._lock.wait(timeout)
+ while token.id in self._pending:
+ self._lock.wait(timeout)
+ if timeout: break
+ def cancel(self):
+ if not protocol.isDispatchThread():
+ protocol.invokeLater(self.cancel)
+ return
+ with self._lock:
+ for cmd in self._pending.values():
+ cmd.token.cancel()
+ del self._queue[:]
+ def getResult(self, wait=True):
+ if wait:
+ self.wait()
+ with self._lock:
+ result = map(lambda c: c.getResult(), self._complete)
+ del self._complete[:]
+ return result
+
+class CommandResult(object):
+ def __init__(self, token, error, args):
+ self.token = token
+ self.error = error
+ # unwrap result if only one element
+ if args and len(args) == 1:
+ args = args[0]
+ self.args = args
+ def __str__(self):
+ if self.error:
+ return "[%s] error: %s" % (self.token.id, self.error)
+ return "[%s] result: %s" % (self.token.id, self.args)
+ __repr__ = __str__
+ def __iter__(self):
+ yield self.error
+ yield self.args
+
+class ServiceWrapper(object):
+ def __init__(self, control, service):
+ self._control = control
+ self._service = service
+ def __getattr__(self, attr):
+ return CommandWrapper(self._control, self._service, attr)
+
+class CommandWrapper(object):
+ def __init__(self, control, service, command):
+ self._control = control
+ self._service = service
+ self._command = command
+ def __call__(self, *args, **kwargs):
+ return self._control.invoke(self._service, self._command, *args, **kwargs)
diff --git a/python/src/tcf/util/task.py b/python/src/tcf/util/task.py
index a31d49a6e..c541dea6a 100644
--- a/python/src/tcf/util/task.py
+++ b/python/src/tcf/util/task.py
@@ -1,180 +1,180 @@
-#******************************************************************************
-# Copyright (c) 2011 Wind River Systems, Inc. and others.
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Eclipse Public License v1.0
-# which accompanies this distribution, and is available at
-# http://www.eclipse.org/legal/epl-v10.html
-#
-# Contributors:
-# Wind River Systems - initial API and implementation
-#******************************************************************************
-
-import threading
-from tcf import protocol, channel
-
-class Task(object):
- """
- A <tt>Task</tt> is an utility class that represents the result of an asynchronous
- communication over TCF framework. Methods are provided to check if the communication is
- complete, to wait for its completion, and to retrieve the result of
- the communication.
-
- Task is useful when communication is requested by a thread other then TCF dispatch thread.
- If client has a global state, for example, cached remote data, multithreading should be avoided,
- because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment.
- Such clients should consider message driven design, see DataCache and its usage as an example.
-
- If a client is extending Task it should implement run() method to perform actual communications.
- The run() method will be execute by TCF dispatch thread, and client code should then call either done() or
- error() to indicate that task computations are complete.
- """
- __result = None
- __is_done = False
- __error = None
- __canceled = False
- __channel_listener = None
-
- def __init__(self, target=None, *args, **kwargs):
- """
- Construct a TCF task object and schedule it for execution.
- """
- if target:
- kwargs["done"] = self.__done
- else:
- target = self.run
-
- self._target = target
- self._args = args
- self._kwargs = kwargs
- self._lock = threading.Condition()
- self.__channel = kwargs.pop("channel", None)
- protocol.invokeLater(self.__doRun)
- timeout = kwargs.pop("timeout", None)
- if timeout:
- protocol.invokeLaterWithDelay(timeout, self.cancel)
-
- def __doRun(self):
- try:
- if self.__channel:
- if self.__channel.getState() != channel.STATE_OPEN:
- raise Exception("Channel is closed")
- task = self
- class CancelOnClose(channel.ChannelListener):
- def onChannelClosed(self, error):
- task.cancel(True)
- self.__channel_listener = CancelOnClose()
- self.__channel.addChannelListener(self.__channel_listener)
- self._target(*self._args, **self._kwargs)
- except Exception as x:
- if not self.__is_done and self.__error is None:
- self.error(x)
-
- def __done(self, error, result):
- if error:
- self.error(error)
- else:
- self.done(result)
-
- def run(self, *args, **kwargs):
- raise NotImplementedError("Abstract method")
-
- def done(self, result):
- with self._lock:
- assert protocol.isDispatchThread()
- if self.__canceled: return
- assert not self.__is_done
- assert not self.__error
- assert self.__result is None
- self.__result = result
- self.__is_done = True
- if self.__channel:
- self.__channel.removeChannelListener(self.__channel_listener)
- self._lock.notifyAll()
-
- def error(self, error):
- """
- Set a error and notify all threads waiting for the task to complete.
- The method is supposed to be called in response to executing of run() method of this task.
-
- @param error - computation error.
- """
- assert protocol.isDispatchThread()
- assert error
- with self._lock:
- if self.__canceled: return
- assert self.__error is None
- assert self.__result is None
- assert not self.__is_done
- self.__error = error
- if self.__channel:
- self.__channel.removeChannelListener(self.__channel_listener)
- self._lock.notifyAll()
-
- def cancel(self):
- assert protocol.isDispatchThread()
- with self._lock:
- if self.isDone(): return False
- self.__canceled = True
- self.__error = Exception("Canceled")
- if self.__channel:
- self.__channel.removeChannelListener(self.__channel_listener)
- self._lock.notifyAll()
- return True
-
- def get(self, timeout=None):
- """
- Waits if necessary for the computation to complete, and then
- retrieves its result.
-
- @return the computed result
- @throws CancellationException if the computation was canceled
- @throws ExecutionException if the computation threw an
- exception
- @throws InterruptedException if the current thread was interrupted
- while waiting
- """
- assert not protocol.isDispatchThread()
- with self._lock:
- while not self.isDone():
- self._lock.wait(timeout)
- if timeout and not self.isDone():
- raise TimeoutException("Timed out")
- if self.__error:
- raise Exception("TCF task aborted", self.__error)
- return self.__result
-
- def isCancelled(self):
- """
- Returns <tt>true</tt> if this task was canceled before it completed
- normally.
-
- @return <tt>true</tt> if task was canceled before it completed
- """
- with self._lock:
- return self.__canceled
-
- def isDone(self):
- """
- Returns <tt>true</tt> if this task completed.
-
- Completion may be due to normal termination, an exception, or
- cancellation -- in all of these cases, this method will return
- <tt>true</tt>.
-
- @return <tt>true</tt> if this task completed.
- """
- with self._lock:
- return self.__error or self.__is_done
-
- def getError(self):
- """
- Return task execution error if any.
- @return Exception object or None
- """
- return self.__error
-
- def getResult(self):
- return self.__result
-
-class TimeoutException(Exception):
- pass
+#******************************************************************************
+# Copyright (c) 2011 Wind River Systems, Inc. and others.
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Public License v1.0
+# which accompanies this distribution, and is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Contributors:
+# Wind River Systems - initial API and implementation
+#******************************************************************************
+
+import threading
+from tcf import protocol, channel
+
+class Task(object):
+ """
+ A <tt>Task</tt> is an utility class that represents the result of an asynchronous
+ communication over TCF framework. Methods are provided to check if the communication is
+ complete, to wait for its completion, and to retrieve the result of
+ the communication.
+
+ Task is useful when communication is requested by a thread other then TCF dispatch thread.
+ If client has a global state, for example, cached remote data, multithreading should be avoided,
+ because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment.
+ Such clients should consider message driven design, see DataCache and its usage as an example.
+
+ If a client is extending Task it should implement run() method to perform actual communications.
+ The run() method will be execute by TCF dispatch thread, and client code should then call either done() or
+ error() to indicate that task computations are complete.
+ """
+ __result = None
+ __is_done = False
+ __error = None
+ __canceled = False
+ __channel_listener = None
+
+ def __init__(self, target=None, *args, **kwargs):
+ """
+ Construct a TCF task object and schedule it for execution.
+ """
+ if target:
+ kwargs["done"] = self.__done
+ else:
+ target = self.run
+
+ self._target = target
+ self._args = args
+ self._kwargs = kwargs
+ self._lock = threading.Condition()
+ self.__channel = kwargs.pop("channel", None)
+ protocol.invokeLater(self.__doRun)
+ timeout = kwargs.pop("timeout", None)
+ if timeout:
+ protocol.invokeLaterWithDelay(timeout, self.cancel)
+
+ def __doRun(self):
+ try:
+ if self.__channel:
+ if self.__channel.getState() != channel.STATE_OPEN:
+ raise Exception("Channel is closed")
+ task = self
+ class CancelOnClose(channel.ChannelListener):
+ def onChannelClosed(self, error):
+ task.cancel(True)
+ self.__channel_listener = CancelOnClose()
+ self.__channel.addChannelListener(self.__channel_listener)
+ self._target(*self._args, **self._kwargs)
+ except Exception as x:
+ if not self.__is_done and self.__error is None:
+ self.error(x)
+
+ def __done(self, error, result):
+ if error:
+ self.error(error)
+ else:
+ self.done(result)
+
+ def run(self, *args, **kwargs):
+ raise NotImplementedError("Abstract method")
+
+ def done(self, result):
+ with self._lock:
+ assert protocol.isDispatchThread()
+ if self.__canceled: return
+ assert not self.__is_done
+ assert not self.__error
+ assert self.__result is None
+ self.__result = result
+ self.__is_done = True
+ if self.__channel:
+ self.__channel.removeChannelListener(self.__channel_listener)
+ self._lock.notifyAll()
+
+ def error(self, error):
+ """
+ Set a error and notify all threads waiting for the task to complete.
+ The method is supposed to be called in response to executing of run() method of this task.
+
+ @param error - computation error.
+ """
+ assert protocol.isDispatchThread()
+ assert error
+ with self._lock:
+ if self.__canceled: return
+ assert self.__error is None
+ assert self.__result is None
+ assert not self.__is_done
+ self.__error = error
+ if self.__channel:
+ self.__channel.removeChannelListener(self.__channel_listener)
+ self._lock.notifyAll()
+
+ def cancel(self):
+ assert protocol.isDispatchThread()
+ with self._lock:
+ if self.isDone(): return False
+ self.__canceled = True
+ self.__error = Exception("Canceled")
+ if self.__channel:
+ self.__channel.removeChannelListener(self.__channel_listener)
+ self._lock.notifyAll()
+ return True
+
+ def get(self, timeout=None):
+ """
+ Waits if necessary for the computation to complete, and then
+ retrieves its result.
+
+ @return the computed result
+ @throws CancellationException if the computation was canceled
+ @throws ExecutionException if the computation threw an
+ exception
+ @throws InterruptedException if the current thread was interrupted
+ while waiting
+ """
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while not self.isDone():
+ self._lock.wait(timeout)
+ if timeout and not self.isDone():
+ raise TimeoutException("Timed out")
+ if self.__error:
+ raise Exception("TCF task aborted", self.__error)
+ return self.__result
+
+ def isCancelled(self):
+ """
+ Returns <tt>true</tt> if this task was canceled before it completed
+ normally.
+
+ @return <tt>true</tt> if task was canceled before it completed
+ """
+ with self._lock:
+ return self.__canceled
+
+ def isDone(self):
+ """
+ Returns <tt>true</tt> if this task completed.
+
+ Completion may be due to normal termination, an exception, or
+ cancellation -- in all of these cases, this method will return
+ <tt>true</tt>.
+
+ @return <tt>true</tt> if this task completed.
+ """
+ with self._lock:
+ return self.__error or self.__is_done
+
+ def getError(self):
+ """
+ Return task execution error if any.
+ @return Exception object or None
+ """
+ return self.__error
+
+ def getResult(self):
+ return self.__result
+
+class TimeoutException(Exception):
+ pass

Back to the top