diff options
author | aleherbau | 2011-05-31 13:20:47 +0000 |
---|---|---|
committer | aleherbau | 2011-05-31 13:20:47 +0000 |
commit | 6c66f7e4848a076f553c81bdb64dd85610af2d79 (patch) | |
tree | 23a07788884e728dbc5b7ad3e6eaef417a0837e4 /python | |
parent | 0e77355b3f6d073cbf77d596308a5a679e7c6f13 (diff) | |
download | org.eclipse.tcf-6c66f7e4848a076f553c81bdb64dd85610af2d79.tar.gz org.eclipse.tcf-6c66f7e4848a076f553c81bdb64dd85610af2d79.tar.xz org.eclipse.tcf-6c66f7e4848a076f553c81bdb64dd85610af2d79.zip |
TCF Python: Start discovery on demand only
Diffstat (limited to 'python')
-rw-r--r-- | python/src/tcf/EventQueue.py | 5 | ||||
-rw-r--r-- | python/src/tcf/protocol.py | 15 | ||||
-rw-r--r-- | python/src/tcf/services/local/LocatorService.py | 54 | ||||
-rw-r--r-- | python/src/tcf/shell.py | 12 |
4 files changed, 66 insertions, 20 deletions
diff --git a/python/src/tcf/EventQueue.py b/python/src/tcf/EventQueue.py index c3084dde2..0efc5f60c 100644 --- a/python/src/tcf/EventQueue.py +++ b/python/src/tcf/EventQueue.py @@ -14,11 +14,12 @@ import protocol class EventQueue(object):
- def __init__(self):
+ def __init__(self, on_shutdown=None):
self.__thread = threading.Thread(target=self, name="TCF Event Dispatcher")
self.__thread.daemon = True
self.__is_waiting = False
self.__is_shutdown = False
+ self.__on_shutdown = on_shutdown
self.__lock = threading.Condition()
self.__queue = []
@@ -27,6 +28,8 @@ class EventQueue(object): def shutdown(self):
try:
+ if self.__on_shutdown:
+ self.__on_shutdown()
with self.__lock:
self.__is_shutdown = True
if self.__is_waiting:
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py index b6644cd8d..1f94ff406 100644 --- a/python/src/tcf/protocol.py +++ b/python/src/tcf/protocol.py @@ -28,7 +28,7 @@ _timer_dispatcher = None def startEventQueue(): global _event_queue, _timer_dispatcher if _event_queue and not _event_queue.isShutdown(): return - _event_queue = EventQueue() + _event_queue = EventQueue(on_shutdown=shutdownDiscovery) _event_queue.start() # initialize LocatorService from services.local.LocatorService import LocatorService @@ -159,6 +159,19 @@ def log(msg, x=None): else: _logger.log(msg, x) +def startDiscovery(): + "Start discovery of remote peers if not running yet" + # initialize LocatorService + from services.local.LocatorService import LocatorService + if LocatorService.locator: + invokeAndWait(LocatorService.startup) + +def shutdownDiscovery(): + "Shutdown discovery if running" + from tcf.services.local.LocatorService import LocatorService + if LocatorService.locator: + invokeAndWait(LocatorService.shutdown) + def getLocator(): """ Get instance of the framework locator service. diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py index 678b0e663..41a1517fc 100644 --- a/python/src/tcf/services/local/LocatorService.py +++ b/python/src/tcf/services/local/LocatorService.py @@ -122,28 +122,51 @@ class LocatorService(locator.LocatorService): peers = {} # str->Peer
listeners = [] # list of LocatorListener
error_log = set() # set of str
- _error_log_lock = threading.RLock()
addr_cache = {} # str->AddressCacheItem
- _addr_cache_lock = threading.Condition()
addr_request = False
local_peer = None
last_master_packet_time = 0
+ @classmethod
+ def getLocalPeer(cls):
+ return cls.local_peer
+
+ @classmethod
+ def getListeners(cls):
+ return cls.listeners[:]
+
+ @classmethod
+ def startup(cls):
+ if cls.locator:
+ cls.locator._startup()
+
+ @classmethod
+ def shutdown(cls):
+ if cls.locator:
+ cls.locator._shutdown()
+
def __init__(self):
+ self._error_log_lock = threading.RLock()
+ self._alive = False
+ LocatorService.locator = self
+ LocatorService.local_peer = peer.LocalPeer()
+
+ def _startup(self):
+ if self._alive: return
+ self._alive = True
+ self._addr_cache_lock = threading.Condition()
self.subnets = set()
self.slaves = []
self.inp_buf = bytearray(MAX_PACKET_SIZE)
self.out_buf = bytearray(MAX_PACKET_SIZE)
service = self
- LocatorService.locator = self
- LocatorService.local_peer = peer.LocalPeer()
class TimerThread(threading.Thread):
def __init__(self, callable):
self._callable = callable
super(TimerThread, self).__init__()
def run(self):
- while True:
+ while service._alive:
try:
time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.)
protocol.invokeAndWait(self._callable)
@@ -155,12 +178,12 @@ class LocatorService(locator.LocatorService): self.timer_thread = TimerThread(self.__refresh_timer)
class DNSLookupThread(threading.Thread):
def run(self):
- while True:
+ while service._alive:
try:
itemSet = None
- with LocatorService._addr_cache_lock:
+ with service._addr_cache_lock:
if not LocatorService.addr_request:
- LocatorService._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
+ service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
msec = int(time.time() * 1000)
for a in LocatorService.addr_cache.values():
if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec:
@@ -177,7 +200,7 @@ class LocatorService(locator.LocatorService): addr = socket.gethostbyname(a.host)
except socket.gaierror:
pass
- with LocatorService._addr_cache_lock:
+ with service._addr_cache_lock:
if addr is None:
a.address = None
else:
@@ -193,7 +216,7 @@ class LocatorService(locator.LocatorService): super(InputThread, self).__init__()
def run(self):
try:
- while True:
+ while service._alive:
sock = service.socket
try:
data, addr = sock.recvfrom(MAX_PACKET_SIZE)
@@ -246,13 +269,9 @@ class LocatorService(locator.LocatorService): except Exception as x:
self._log("Cannot open UDP socket for TCF discovery protocol", x)
- @classmethod
- def getLocalPeer(cls):
- return cls.local_peer
-
- @classmethod
- def getListeners(cls):
- return cls.listeners[:]
+ def _shutdown(self):
+ if self._alive:
+ self._alive = False
def __makeErrorReport(self, code, msg):
err = {}
@@ -288,6 +307,7 @@ class LocatorService(locator.LocatorService): channel.terminate(x)
def _log(self, msg, x):
+ if not self._alive: return
# Don't report same error multiple times to avoid filling up the log file.
with self._error_log_lock:
if msg in self.error_log: return
diff --git a/python/src/tcf/shell.py b/python/src/tcf/shell.py index aa3958603..8042abcd2 100644 --- a/python/src/tcf/shell.py +++ b/python/src/tcf/shell.py @@ -48,15 +48,25 @@ class Shell(code.InteractiveConsole, protocol.ChannelOpenListener, channel.Chann "connect" : tcf.connect,
"peers" : print_peers()
}
- sys.ps1 = "tcf> "
protocol.startEventQueue()
+ protocol.startDiscovery()
protocol.invokeAndWait(protocol.addChannelOpenListener, self)
code.InteractiveConsole.__init__(self, locals)
def interact(self, banner=None):
try:
+ try:
+ ps1 = sys.ps1 #@UndefinedVariable
+ except AttributeError:
+ ps1 = None
+ sys.ps1 = "tcf> "
super(Shell, self).interact(banner)
finally:
+ if ps1:
+ sys.ps1 = ps1
+ else:
+ del sys.ps1
protocol.invokeLater(protocol.removeChannelOpenListener, self)
+ protocol.shutdownDiscovery()
protocol.getEventQueue().shutdown()
def onChannelOpen(self, channel):
wrapper = sync.DispatchWrapper(channel)
|