Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/protocol.py')
-rw-r--r--python/src/tcf/protocol.py60
1 files changed, 44 insertions, 16 deletions
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py
index 2fd45e634..b424b8167 100644
--- a/python/src/tcf/protocol.py
+++ b/python/src/tcf/protocol.py
@@ -1,5 +1,5 @@
# *****************************************************************************
-# * Copyright (c) 2011, 2013 Wind River Systems, Inc. and others.
+# * Copyright (c) 2011, 2013, 2016 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -40,7 +40,7 @@ def startEventQueue():
_event_queue = EventQueue.EventQueue(on_shutdown=shutdownDiscovery)
_event_queue.start()
# initialize LocatorService
- from services.local.LocatorService import LocatorService
+ from .services.local.LocatorService import LocatorService
_event_queue.invokeLater(LocatorService)
# start timer dispatcher
_timer_dispatcher = threading.Thread(target=_dispatch_timers)
@@ -49,6 +49,26 @@ def startEventQueue():
_timer_dispatcher.start()
+def stopEventQueue():
+ """Stop the TCF event queue."""
+
+ if not _event_queue or _event_queue.isShutdown():
+ return
+
+ _event_queue.shutdown()
+
+ # Stop timer dispatcher
+
+ global _timer_queue_alive
+
+ try:
+ with _timer_queue_lock:
+ _timer_queue_alive = False
+ _timer_queue_lock.notify_all()
+ except Exception:
+ pass
+
+
def getEventQueue():
"""
@return instance of event queue that is used for TCF events.
@@ -164,16 +184,16 @@ def log(msg, x=None):
@param x - an exception associated with the log entry or None.
"""
if not _logger:
- print >> sys.stderr, msg
+ sys.stderr.write(str(msg) + '\n')
while x:
import traceback
- print >> sys.stderr, "%s: %s" % (type(x).__name__, x)
+ sys.stderr.write("%s: %s\n" % (type(x).__name__, x))
tb = getattr(x, "tb", None) or sys.exc_info()[2]
if tb:
traceback.print_tb(tb)
caused_by = getattr(x, "caused_by", None)
if caused_by:
- print >> sys.stderr, "Caused by:"
+ sys.stderr.write("Caused by: " + str(caused_by) + "\n")
x = caused_by
else:
break
@@ -184,7 +204,7 @@ def log(msg, x=None):
def startDiscovery():
"Start discovery of remote peers if not running yet"
# initialize LocatorService
- from services.local.LocatorService import LocatorService
+ from .services.local.LocatorService import LocatorService
if LocatorService.locator:
invokeAndWait(LocatorService.startup)
@@ -212,7 +232,7 @@ def getOpenChannels():
@return an array of IChannel
"""
assert isDispatchThread()
- import transport
+ from . import transport
return transport.getOpenChannels()
@@ -237,7 +257,7 @@ def addChannelOpenListener(listener):
@param listener
"""
assert isDispatchThread()
- import transport
+ from . import transport
transport.addChannelOpenListener(listener)
@@ -247,7 +267,7 @@ def removeChannelOpenListener(listener):
@param listener
"""
assert isDispatchThread()
- import transport
+ from . import transport
transport.removeChannelOpenListener(listener)
@@ -257,7 +277,7 @@ def sendEvent(service_name, event_name, data):
The message is sent to all open communication channels - broadcasted.
"""
assert isDispatchThread()
- import transport
+ from . import transport
transport.sendEvent(service_name, event_name, data)
@@ -276,7 +296,7 @@ def sync(done):
targets.
"""
assert isDispatchThread()
- import transport
+ from . import transport
transport.sync(done)
@@ -355,7 +375,7 @@ def addServiceProvider(provider):
This method can be invoked from any thread.
@param provider - ServiceProvider implementation
"""
- import services
+ from . import services
services.addServiceProvider(provider)
@@ -365,7 +385,7 @@ def removeServiceProvider(provider):
This method can be invoked from any thread.
@param provider - ServiceProvider implementation
"""
- import services
+ from . import services
services.removeServiceProvider(provider)
@@ -375,7 +395,7 @@ def addTransportProvider(provider):
This method can be invoked from any thread.
@param provider - TransportProvider implementation
"""
- import transport
+ from . import transport
transport.addTransportProvider(provider)
@@ -385,7 +405,7 @@ def removeTransportProvider(provider):
This method can be invoked from any thread.
@param provider - TransportProvider implementation
"""
- import transport
+ from . import transport
transport.removeTransportProvider(provider)
@@ -400,6 +420,9 @@ class Timer(object):
self.args = args
self.kwargs = kwargs
+ def __lt__(self, x):
+ return self.__cmp__(x) == -1
+
def __cmp__(self, x):
if x is self:
return 0
@@ -416,12 +439,17 @@ class Timer(object):
_timer_queue_lock = threading.Condition()
_timer_queue = []
+_timer_queue_alive = False
def _dispatch_timers():
+ global _timer_queue_alive
+
+ _timer_queue_alive = True
+
try:
with _timer_queue_lock:
- while True:
+ while _timer_queue_alive:
if not _timer_queue:
_timer_queue_lock.wait()
else:

Back to the top