Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/services/local/LocatorService.py')
-rw-r--r--python/src/tcf/services/local/LocatorService.py54
1 files changed, 37 insertions, 17 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index 678b0e663..41a1517fc 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -122,28 +122,51 @@ class LocatorService(locator.LocatorService):
peers = {} # str->Peer
listeners = [] # list of LocatorListener
error_log = set() # set of str
- _error_log_lock = threading.RLock()
addr_cache = {} # str->AddressCacheItem
- _addr_cache_lock = threading.Condition()
addr_request = False
local_peer = None
last_master_packet_time = 0
+ @classmethod
+ def getLocalPeer(cls):
+ return cls.local_peer
+
+ @classmethod
+ def getListeners(cls):
+ return cls.listeners[:]
+
+ @classmethod
+ def startup(cls):
+ if cls.locator:
+ cls.locator._startup()
+
+ @classmethod
+ def shutdown(cls):
+ if cls.locator:
+ cls.locator._shutdown()
+
def __init__(self):
+ self._error_log_lock = threading.RLock()
+ self._alive = False
+ LocatorService.locator = self
+ LocatorService.local_peer = peer.LocalPeer()
+
+ def _startup(self):
+ if self._alive: return
+ self._alive = True
+ self._addr_cache_lock = threading.Condition()
self.subnets = set()
self.slaves = []
self.inp_buf = bytearray(MAX_PACKET_SIZE)
self.out_buf = bytearray(MAX_PACKET_SIZE)
service = self
- LocatorService.locator = self
- LocatorService.local_peer = peer.LocalPeer()
class TimerThread(threading.Thread):
def __init__(self, callable):
self._callable = callable
super(TimerThread, self).__init__()
def run(self):
- while True:
+ while service._alive:
try:
time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.)
protocol.invokeAndWait(self._callable)
@@ -155,12 +178,12 @@ class LocatorService(locator.LocatorService):
self.timer_thread = TimerThread(self.__refresh_timer)
class DNSLookupThread(threading.Thread):
def run(self):
- while True:
+ while service._alive:
try:
itemSet = None
- with LocatorService._addr_cache_lock:
+ with service._addr_cache_lock:
if not LocatorService.addr_request:
- LocatorService._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
+ service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
msec = int(time.time() * 1000)
for a in LocatorService.addr_cache.values():
if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec:
@@ -177,7 +200,7 @@ class LocatorService(locator.LocatorService):
addr = socket.gethostbyname(a.host)
except socket.gaierror:
pass
- with LocatorService._addr_cache_lock:
+ with service._addr_cache_lock:
if addr is None:
a.address = None
else:
@@ -193,7 +216,7 @@ class LocatorService(locator.LocatorService):
super(InputThread, self).__init__()
def run(self):
try:
- while True:
+ while service._alive:
sock = service.socket
try:
data, addr = sock.recvfrom(MAX_PACKET_SIZE)
@@ -246,13 +269,9 @@ class LocatorService(locator.LocatorService):
except Exception as x:
self._log("Cannot open UDP socket for TCF discovery protocol", x)
- @classmethod
- def getLocalPeer(cls):
- return cls.local_peer
-
- @classmethod
- def getListeners(cls):
- return cls.listeners[:]
+ def _shutdown(self):
+ if self._alive:
+ self._alive = False
def __makeErrorReport(self, code, msg):
err = {}
@@ -288,6 +307,7 @@ class LocatorService(locator.LocatorService):
channel.terminate(x)
def _log(self, msg, x):
+ if not self._alive: return
# Don't report same error multiple times to avoid filling up the log file.
with self._error_log_lock:
if msg in self.error_log: return

Back to the top