Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleherbau2011-05-25 08:35:21 -0400
committeraleherbau2011-05-25 08:35:21 -0400
commit4e0d17e91c267c23b23e46ea09b804c6869a09c6 (patch)
tree9c7dfd9fd84abbf32699226e42013701f62d57c7 /python/src/tcf/protocol.py
parent5fa9053b2ee4244f0ccc2dcad4273e69ff371068 (diff)
downloadorg.eclipse.tcf-4e0d17e91c267c23b23e46ea09b804c6869a09c6.tar.gz
org.eclipse.tcf-4e0d17e91c267c23b23e46ea09b804c6869a09c6.tar.xz
org.eclipse.tcf-4e0d17e91c267c23b23e46ea09b804c6869a09c6.zip
TCF Python: Implemented timer queue, added test for Task, code cleanup
Diffstat (limited to 'python/src/tcf/protocol.py')
-rw-r--r--python/src/tcf/protocol.py62
1 files changed, 54 insertions, 8 deletions
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py
index 9dfcf558e..96b91d18c 100644
--- a/python/src/tcf/protocol.py
+++ b/python/src/tcf/protocol.py
@@ -19,18 +19,25 @@ It also provides utility methods for posting asynchronous events,
including delayed events (timers).
"""
-import sys, uuid, threading
+import sys, uuid, threading, time
from EventQueue import EventQueue
_event_queue = None
+_timer_dispatcher = None
+
def startEventQueue():
- global _event_queue
+ global _event_queue, _timer_dispatcher
if _event_queue and not _event_queue.isShutdown(): return
_event_queue = EventQueue()
_event_queue.start()
# initialize LocatorService
from services.local.LocatorService import LocatorService
_event_queue.invokeLater(LocatorService)
+ # start timer dispatcher
+ _timer_dispatcher = threading.Thread(target = _dispatch_timers)
+ _timer_dispatcher.setName("TCF Timer Dispatcher")
+ _timer_dispatcher.setDaemon(True)
+ _timer_dispatcher.start()
def getEventQueue():
"""
@@ -78,11 +85,9 @@ def invokeLaterWithDelay(delay, callable, *args, **kwargs):
if delay <= 0:
_event_queue.invokeLater(callable, *args, **kwargs)
else:
- # TODO timer_queue
- raise NotImplementedError("Implement invokeLaterWithDelay")
-# synchronized (timer_queue) {
-# timer_queue.add(new Timer(System.currentTimeMillis() + delay, runnable))
-# timer_queue.notify()
+ with _timer_queue_lock:
+ _timer_queue.append(Timer(time.time() + delay /1000., callable, *args, **kwargs))
+ _timer_queue_lock.notify()
def invokeAndWait(callable, *args, **kwargs):
"""
@@ -134,7 +139,7 @@ def log(msg, x=None):
@see #setLogger
This method can be invoked from any thread.
@param msg - log entry text
- @param x - an exception associated with the log entry or null.
+ @param x - an exception associated with the log entry or None.
"""
if not _logger:
print>>sys.stderr, msg
@@ -318,3 +323,44 @@ def removeTransportProvider(provider):
"""
import transport
transport.removeTransportProvider(provider)
+
+class Timer(object):
+ timer_cnt = 0
+ def __init__(self, time, run, *args, **kwargs):
+ self.id = Timer.timer_cnt
+ Timer.timer_cnt += 1
+ self.time = time
+ self.run = run
+ self.args = args
+ self.kwargs = kwargs
+
+ def __cmp__(self, x):
+ if x is self: return 0
+ if self.time < x.time: return -1
+ if self.time > x.time: return +1
+ if self.id < x.id: return -1
+ if self.id > x.id: return +1
+ assert False
+ return 0
+
+_timer_queue_lock = threading.Condition()
+_timer_queue = []
+def _dispatch_timers():
+ try:
+ with _timer_queue_lock:
+ while True:
+ if not _timer_queue:
+ _timer_queue_lock.wait()
+ else:
+ tm = time.time()
+ t = _timer_queue[0]
+ if t.time > tm:
+ _timer_queue_lock.wait(t.time - tm)
+ else:
+ _timer_queue.pop(0)
+ invokeLater(t.run, *t.args, **t.kwargs)
+ except RuntimeError:
+ # Event queue is shut down, exit this thread
+ pass
+ except Exception as x:
+ log("Exception in TCF dispatch loop", x)

Back to the top