From 6c66f7e4848a076f553c81bdb64dd85610af2d79 Mon Sep 17 00:00:00 2001 From: aleherbau Date: Tue, 31 May 2011 13:20:47 +0000 Subject: TCF Python: Start discovery on demand only --- python/src/tcf/EventQueue.py | 5 ++- python/src/tcf/protocol.py | 15 ++++++- python/src/tcf/services/local/LocatorService.py | 54 +++++++++++++++++-------- python/src/tcf/shell.py | 12 +++++- 4 files changed, 66 insertions(+), 20 deletions(-) (limited to 'python') diff --git a/python/src/tcf/EventQueue.py b/python/src/tcf/EventQueue.py index c3084dde2..0efc5f60c 100644 --- a/python/src/tcf/EventQueue.py +++ b/python/src/tcf/EventQueue.py @@ -14,11 +14,12 @@ import protocol class EventQueue(object): - def __init__(self): + def __init__(self, on_shutdown=None): self.__thread = threading.Thread(target=self, name="TCF Event Dispatcher") self.__thread.daemon = True self.__is_waiting = False self.__is_shutdown = False + self.__on_shutdown = on_shutdown self.__lock = threading.Condition() self.__queue = [] @@ -27,6 +28,8 @@ class EventQueue(object): def shutdown(self): try: + if self.__on_shutdown: + self.__on_shutdown() with self.__lock: self.__is_shutdown = True if self.__is_waiting: diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py index b6644cd8d..1f94ff406 100644 --- a/python/src/tcf/protocol.py +++ b/python/src/tcf/protocol.py @@ -28,7 +28,7 @@ _timer_dispatcher = None def startEventQueue(): global _event_queue, _timer_dispatcher if _event_queue and not _event_queue.isShutdown(): return - _event_queue = EventQueue() + _event_queue = EventQueue(on_shutdown=shutdownDiscovery) _event_queue.start() # initialize LocatorService from services.local.LocatorService import LocatorService @@ -159,6 +159,19 @@ def log(msg, x=None): else: _logger.log(msg, x) +def startDiscovery(): + "Start discovery of remote peers if not running yet" + # initialize LocatorService + from services.local.LocatorService import LocatorService + if LocatorService.locator: + invokeAndWait(LocatorService.startup) + +def shutdownDiscovery(): + "Shutdown discovery if running" + from tcf.services.local.LocatorService import LocatorService + if LocatorService.locator: + invokeAndWait(LocatorService.shutdown) + def getLocator(): """ Get instance of the framework locator service. diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py index 678b0e663..41a1517fc 100644 --- a/python/src/tcf/services/local/LocatorService.py +++ b/python/src/tcf/services/local/LocatorService.py @@ -122,28 +122,51 @@ class LocatorService(locator.LocatorService): peers = {} # str->Peer listeners = [] # list of LocatorListener error_log = set() # set of str - _error_log_lock = threading.RLock() addr_cache = {} # str->AddressCacheItem - _addr_cache_lock = threading.Condition() addr_request = False local_peer = None last_master_packet_time = 0 + @classmethod + def getLocalPeer(cls): + return cls.local_peer + + @classmethod + def getListeners(cls): + return cls.listeners[:] + + @classmethod + def startup(cls): + if cls.locator: + cls.locator._startup() + + @classmethod + def shutdown(cls): + if cls.locator: + cls.locator._shutdown() + def __init__(self): + self._error_log_lock = threading.RLock() + self._alive = False + LocatorService.locator = self + LocatorService.local_peer = peer.LocalPeer() + + def _startup(self): + if self._alive: return + self._alive = True + self._addr_cache_lock = threading.Condition() self.subnets = set() self.slaves = [] self.inp_buf = bytearray(MAX_PACKET_SIZE) self.out_buf = bytearray(MAX_PACKET_SIZE) service = self - LocatorService.locator = self - LocatorService.local_peer = peer.LocalPeer() class TimerThread(threading.Thread): def __init__(self, callable): self._callable = callable super(TimerThread, self).__init__() def run(self): - while True: + while service._alive: try: time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.) protocol.invokeAndWait(self._callable) @@ -155,12 +178,12 @@ class LocatorService(locator.LocatorService): self.timer_thread = TimerThread(self.__refresh_timer) class DNSLookupThread(threading.Thread): def run(self): - while True: + while service._alive: try: itemSet = None - with LocatorService._addr_cache_lock: + with service._addr_cache_lock: if not LocatorService.addr_request: - LocatorService._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD) + service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD) msec = int(time.time() * 1000) for a in LocatorService.addr_cache.values(): if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec: @@ -177,7 +200,7 @@ class LocatorService(locator.LocatorService): addr = socket.gethostbyname(a.host) except socket.gaierror: pass - with LocatorService._addr_cache_lock: + with service._addr_cache_lock: if addr is None: a.address = None else: @@ -193,7 +216,7 @@ class LocatorService(locator.LocatorService): super(InputThread, self).__init__() def run(self): try: - while True: + while service._alive: sock = service.socket try: data, addr = sock.recvfrom(MAX_PACKET_SIZE) @@ -246,13 +269,9 @@ class LocatorService(locator.LocatorService): except Exception as x: self._log("Cannot open UDP socket for TCF discovery protocol", x) - @classmethod - def getLocalPeer(cls): - return cls.local_peer - - @classmethod - def getListeners(cls): - return cls.listeners[:] + def _shutdown(self): + if self._alive: + self._alive = False def __makeErrorReport(self, code, msg): err = {} @@ -288,6 +307,7 @@ class LocatorService(locator.LocatorService): channel.terminate(x) def _log(self, msg, x): + if not self._alive: return # Don't report same error multiple times to avoid filling up the log file. with self._error_log_lock: if msg in self.error_log: return diff --git a/python/src/tcf/shell.py b/python/src/tcf/shell.py index aa3958603..8042abcd2 100644 --- a/python/src/tcf/shell.py +++ b/python/src/tcf/shell.py @@ -48,15 +48,25 @@ class Shell(code.InteractiveConsole, protocol.ChannelOpenListener, channel.Chann "connect" : tcf.connect, "peers" : print_peers() } - sys.ps1 = "tcf> " protocol.startEventQueue() + protocol.startDiscovery() protocol.invokeAndWait(protocol.addChannelOpenListener, self) code.InteractiveConsole.__init__(self, locals) def interact(self, banner=None): try: + try: + ps1 = sys.ps1 #@UndefinedVariable + except AttributeError: + ps1 = None + sys.ps1 = "tcf> " super(Shell, self).interact(banner) finally: + if ps1: + sys.ps1 = ps1 + else: + del sys.ps1 protocol.invokeLater(protocol.removeChannelOpenListener, self) + protocol.shutdownDiscovery() protocol.getEventQueue().shutdown() def onChannelOpen(self, channel): wrapper = sync.DispatchWrapper(channel) -- cgit v1.2.3