Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleherbau2011-05-31 09:20:47 -0400
committeraleherbau2011-05-31 09:20:47 -0400
commit6c66f7e4848a076f553c81bdb64dd85610af2d79 (patch)
tree23a07788884e728dbc5b7ad3e6eaef417a0837e4 /python/src
parent0e77355b3f6d073cbf77d596308a5a679e7c6f13 (diff)
downloadorg.eclipse.tcf-6c66f7e4848a076f553c81bdb64dd85610af2d79.tar.gz
org.eclipse.tcf-6c66f7e4848a076f553c81bdb64dd85610af2d79.tar.xz
org.eclipse.tcf-6c66f7e4848a076f553c81bdb64dd85610af2d79.zip
TCF Python: Start discovery on demand only
Diffstat (limited to 'python/src')
-rw-r--r--python/src/tcf/EventQueue.py5
-rw-r--r--python/src/tcf/protocol.py15
-rw-r--r--python/src/tcf/services/local/LocatorService.py54
-rw-r--r--python/src/tcf/shell.py12
4 files changed, 66 insertions, 20 deletions
diff --git a/python/src/tcf/EventQueue.py b/python/src/tcf/EventQueue.py
index c3084dde2..0efc5f60c 100644
--- a/python/src/tcf/EventQueue.py
+++ b/python/src/tcf/EventQueue.py
@@ -14,11 +14,12 @@ import protocol
class EventQueue(object):
- def __init__(self):
+ def __init__(self, on_shutdown=None):
self.__thread = threading.Thread(target=self, name="TCF Event Dispatcher")
self.__thread.daemon = True
self.__is_waiting = False
self.__is_shutdown = False
+ self.__on_shutdown = on_shutdown
self.__lock = threading.Condition()
self.__queue = []
@@ -27,6 +28,8 @@ class EventQueue(object):
def shutdown(self):
try:
+ if self.__on_shutdown:
+ self.__on_shutdown()
with self.__lock:
self.__is_shutdown = True
if self.__is_waiting:
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py
index b6644cd8d..1f94ff406 100644
--- a/python/src/tcf/protocol.py
+++ b/python/src/tcf/protocol.py
@@ -28,7 +28,7 @@ _timer_dispatcher = None
def startEventQueue():
global _event_queue, _timer_dispatcher
if _event_queue and not _event_queue.isShutdown(): return
- _event_queue = EventQueue()
+ _event_queue = EventQueue(on_shutdown=shutdownDiscovery)
_event_queue.start()
# initialize LocatorService
from services.local.LocatorService import LocatorService
@@ -159,6 +159,19 @@ def log(msg, x=None):
else:
_logger.log(msg, x)
+def startDiscovery():
+ "Start discovery of remote peers if not running yet"
+ # initialize LocatorService
+ from services.local.LocatorService import LocatorService
+ if LocatorService.locator:
+ invokeAndWait(LocatorService.startup)
+
+def shutdownDiscovery():
+ "Shutdown discovery if running"
+ from tcf.services.local.LocatorService import LocatorService
+ if LocatorService.locator:
+ invokeAndWait(LocatorService.shutdown)
+
def getLocator():
"""
Get instance of the framework locator service.
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index 678b0e663..41a1517fc 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -122,28 +122,51 @@ class LocatorService(locator.LocatorService):
peers = {} # str->Peer
listeners = [] # list of LocatorListener
error_log = set() # set of str
- _error_log_lock = threading.RLock()
addr_cache = {} # str->AddressCacheItem
- _addr_cache_lock = threading.Condition()
addr_request = False
local_peer = None
last_master_packet_time = 0
+ @classmethod
+ def getLocalPeer(cls):
+ return cls.local_peer
+
+ @classmethod
+ def getListeners(cls):
+ return cls.listeners[:]
+
+ @classmethod
+ def startup(cls):
+ if cls.locator:
+ cls.locator._startup()
+
+ @classmethod
+ def shutdown(cls):
+ if cls.locator:
+ cls.locator._shutdown()
+
def __init__(self):
+ self._error_log_lock = threading.RLock()
+ self._alive = False
+ LocatorService.locator = self
+ LocatorService.local_peer = peer.LocalPeer()
+
+ def _startup(self):
+ if self._alive: return
+ self._alive = True
+ self._addr_cache_lock = threading.Condition()
self.subnets = set()
self.slaves = []
self.inp_buf = bytearray(MAX_PACKET_SIZE)
self.out_buf = bytearray(MAX_PACKET_SIZE)
service = self
- LocatorService.locator = self
- LocatorService.local_peer = peer.LocalPeer()
class TimerThread(threading.Thread):
def __init__(self, callable):
self._callable = callable
super(TimerThread, self).__init__()
def run(self):
- while True:
+ while service._alive:
try:
time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.)
protocol.invokeAndWait(self._callable)
@@ -155,12 +178,12 @@ class LocatorService(locator.LocatorService):
self.timer_thread = TimerThread(self.__refresh_timer)
class DNSLookupThread(threading.Thread):
def run(self):
- while True:
+ while service._alive:
try:
itemSet = None
- with LocatorService._addr_cache_lock:
+ with service._addr_cache_lock:
if not LocatorService.addr_request:
- LocatorService._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
+ service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
msec = int(time.time() * 1000)
for a in LocatorService.addr_cache.values():
if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec:
@@ -177,7 +200,7 @@ class LocatorService(locator.LocatorService):
addr = socket.gethostbyname(a.host)
except socket.gaierror:
pass
- with LocatorService._addr_cache_lock:
+ with service._addr_cache_lock:
if addr is None:
a.address = None
else:
@@ -193,7 +216,7 @@ class LocatorService(locator.LocatorService):
super(InputThread, self).__init__()
def run(self):
try:
- while True:
+ while service._alive:
sock = service.socket
try:
data, addr = sock.recvfrom(MAX_PACKET_SIZE)
@@ -246,13 +269,9 @@ class LocatorService(locator.LocatorService):
except Exception as x:
self._log("Cannot open UDP socket for TCF discovery protocol", x)
- @classmethod
- def getLocalPeer(cls):
- return cls.local_peer
-
- @classmethod
- def getListeners(cls):
- return cls.listeners[:]
+ def _shutdown(self):
+ if self._alive:
+ self._alive = False
def __makeErrorReport(self, code, msg):
err = {}
@@ -288,6 +307,7 @@ class LocatorService(locator.LocatorService):
channel.terminate(x)
def _log(self, msg, x):
+ if not self._alive: return
# Don't report same error multiple times to avoid filling up the log file.
with self._error_log_lock:
if msg in self.error_log: return
diff --git a/python/src/tcf/shell.py b/python/src/tcf/shell.py
index aa3958603..8042abcd2 100644
--- a/python/src/tcf/shell.py
+++ b/python/src/tcf/shell.py
@@ -48,15 +48,25 @@ class Shell(code.InteractiveConsole, protocol.ChannelOpenListener, channel.Chann
"connect" : tcf.connect,
"peers" : print_peers()
}
- sys.ps1 = "tcf> "
protocol.startEventQueue()
+ protocol.startDiscovery()
protocol.invokeAndWait(protocol.addChannelOpenListener, self)
code.InteractiveConsole.__init__(self, locals)
def interact(self, banner=None):
try:
+ try:
+ ps1 = sys.ps1 #@UndefinedVariable
+ except AttributeError:
+ ps1 = None
+ sys.ps1 = "tcf> "
super(Shell, self).interact(banner)
finally:
+ if ps1:
+ sys.ps1 = ps1
+ else:
+ del sys.ps1
protocol.invokeLater(protocol.removeChannelOpenListener, self)
+ protocol.shutdownDiscovery()
protocol.getEventQueue().shutdown()
def onChannelOpen(self, channel):
wrapper = sync.DispatchWrapper(channel)

Back to the top