diff options
Diffstat (limited to 'python/src/tcf/services/local/LocatorService.py')
-rw-r--r-- | python/src/tcf/services/local/LocatorService.py | 388 |
1 files changed, 265 insertions, 123 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py index dffd3556a..5587d60b8 100644 --- a/python/src/tcf/services/local/LocatorService.py +++ b/python/src/tcf/services/local/LocatorService.py @@ -1,5 +1,5 @@ # ***************************************************************************** -# * Copyright (c) 2011, 2012 Wind River Systems, Inc. and others. +# * Copyright (c) 2011, 2013 Wind River Systems, Inc. and others. # * All rights reserved. This program and the accompanying materials # * are made available under the terms of the Eclipse Public License v1.0 # * which accompanies this distribution, and is available at @@ -20,11 +20,11 @@ import threading import time import socket import cStringIO -from tcf.services import locator -from tcf.util import logging -from tcf.channel import fromJSONSequence, toJSONSequence -from tcf.channel.ChannelProxy import ChannelProxy -from tcf import protocol, services, channel, peer, errors +from .. import locator +from ...util import logging +from ...channel import fromJSONSequence, toJSONSequence +from ...channel.ChannelProxy import ChannelProxy +from ... import protocol, services, channel, peer, errors # Flag indicating whether tracing of the the discovery activity is enabled. __TRACE_DISCOVERY__ = False @@ -38,10 +38,12 @@ class SubNet(object): self.last_slaves_req_time = 0 def contains(self, addr): - if addr is None or self.address is None: return False + if addr is None or self.address is None: + return False a1 = addr.getAddress() a2 = self.address.getAddress() - if len(a1) != len(a2): return False + if len(a1) != len(a2): + return False i = 0 if self.prefix_length <= len(a1) * 8: l = self.prefix_length @@ -49,87 +51,112 @@ class SubNet(object): l = len(a1) * 8 while i + 8 <= l: n = i / 8 - if a1[n] != a2[n]: return False + if a1[n] != a2[n]: + return False i += 8 while i < l: n = i / 8 m = 1 << (7 - i % 8) - if (a1[n] & m) != (a2[n] & m): return False + if (a1[n] & m) != (a2[n] & m): + return False i += 1 return True + def __eq__(self, o): - if not isinstance(o, SubNet): return False + if not isinstance(o, SubNet): + return False return self.prefix_length == o.prefix_length and \ self.broadcast == o.broadcast and \ self.address == o.address + def __hash__(self): return hash(self.address) + def __str__(self): return "%s/%d" % (self.address.getHostAddress(), self.prefix_length) + class Slave(object): # Time of last packet receiver from self slave last_packet_time = 0 - #Time of last REQ_SLAVES packet received from self slave + # Time of last REQ_SLAVES packet received from self slave last_req_slaves_time = 0 def __init__(self, address, port): self.address = address self.port = port + def __str__(self): return "%s/%d" % (self.address.getHostAddress(), self.port) + class AddressCacheItem(object): address = None time_stamp = 0 used = False + def __init__(self, host): self.host = host + class InetAddress(object): "Mimicking Java InetAddress class" def __init__(self, host, addr): self.host = host self.addr = addr + def getAddress(self): return socket.inet_aton(self.addr) + def getHostAddress(self): return self.addr + def __eq__(self, other): - if not isinstance(other, InetAddress): return False + if not isinstance(other, InetAddress): + return False return self.addr == other.addr + def __str__(self): return "%s/%s" % (self.host or "", self.addr) + class InputPacket(object): "Wrapper for UDP packet data." def __init__(self, data, addr, port): self.data = data self.addr = addr self.port = port + def getLength(self): return len(self.data) + def getData(self): return self.data + def getPort(self): return self.port + def getAddress(self): return self.addr + def __str__(self): - return "[address=%s,port=%d,data=\"%s\"]" % (self.getAddress(), self.getPort(), self.data) + return "[address=%s,port=%d,data=\"%s\"]" % \ + (self.getAddress(), self.getPort(), self.data) DISCOVEY_PORT = 1534 MAX_PACKET_SIZE = 9000 - 40 - 8 PREF_PACKET_SIZE = 1500 - 40 - 8 # TODO: research usage of DNS-SD (DNS Service Discovery) to discover TCF peers + + class LocatorService(locator.LocatorService): locator = None - peers = {} # str->Peer - listeners = [] # list of LocatorListener - error_log = set() # set of str + peers = {} # str->Peer + listeners = [] # list of LocatorListener + error_log = set() # set of str addr_list = [] - addr_cache = {} # str->AddressCacheItem + addr_cache = {} # str->AddressCacheItem addr_request = False local_peer = None last_master_packet_time = 0 @@ -159,7 +186,8 @@ class LocatorService(locator.LocatorService): LocatorService.local_peer = peer.LocalPeer() def _startup(self): - if self._alive: return + if self._alive: + return self._alive = True self._addr_cache_lock = threading.Condition() self.subnets = set() @@ -167,10 +195,12 @@ class LocatorService(locator.LocatorService): self.inp_buf = bytearray(MAX_PACKET_SIZE) self.out_buf = bytearray(MAX_PACKET_SIZE) service = self + class TimerThread(threading.Thread): def __init__(self, _callable): self._callable = _callable super(TimerThread, self).__init__() + def run(self): while service._alive: try: @@ -180,21 +210,25 @@ class LocatorService(locator.LocatorService): # TCF event dispatch is shut down return except Exception as x: - service._log("Unhandled exception in TCF discovery timer thread", x) + service._log("Unhandled exception in TCF discovery " + + "timer thread", x) self.timer_thread = TimerThread(self.__refresh_timer) + class DNSLookupThread(threading.Thread): def run(self): while service._alive: try: itemSet = None with service._addr_cache_lock: + period = locator.DATA_RETENTION_PERIOD if not LocatorService.addr_request: - service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD) + service._addr_cache_lock.wait(period) msec = int(time.time() * 1000) for host, a in LocatorService.addr_cache.items(): - if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec: + if a.time_stamp + period * 10 < msec: if a.used: - if itemSet is None: itemSet = set() + if itemSet is None: + itemSet = set() itemSet.add(a) else: del LocatorService.addr_cache[host] @@ -214,32 +248,40 @@ class LocatorService(locator.LocatorService): a.time_stamp = msec a.used = False except Exception as x: - service._log("Unhandled exception in TCF discovery DNS lookup thread", x) + service._log("Unhandled exception in TCF discovery " + + "DNS lookup thread", x) self.dns_lookup_thread = DNSLookupThread() + class InputThread(threading.Thread): def __init__(self, _callable): self._callable = _callable super(InputThread, self).__init__() + def run(self): try: while service._alive: sock = service.socket try: data, addr = sock.recvfrom(MAX_PACKET_SIZE) - p = InputPacket(data, InetAddress(None, addr[0]), addr[1]) + p = InputPacket(data, InetAddress(None, addr[0]), + addr[1]) protocol.invokeAndWait(self._callable, p) except RuntimeError: # TCF event dispatch is shutdown return except socket.error as x: - if sock != service.socket: continue + if sock != service.socket: + continue # frequent error on windows, unknown reason - if x.errno == 10054: continue + if x.errno == 10054: + continue port = sock.getsockname()[1] - service._log("Cannot read from datagram socket at port %d" % port, x) + service._log("Cannot read from datagram socket " + + "at port %d" % port, x) time.sleep(2) except Exception as x: - service._log("Unhandled exception in socket reading thread", x) + service._log("Unhandled exception in socket reading " + + "thread", x) self.input_thread = InputThread(self.__handleDatagramPacket) try: self.loopback_addr = InetAddress(None, "127.0.0.1") @@ -248,11 +290,13 @@ class LocatorService(locator.LocatorService): try: self.socket.bind(('', DISCOVEY_PORT)) if __TRACE_DISCOVERY__: - logging.trace("Became the master agent (bound to port %d)" % self.socket.getsockname()[1]) + logging.trace("Became the master agent (bound to port " + + "%d)" % self.socket.getsockname()[1]) except socket.error as x: self.socket.bind(('', 0)) if __TRACE_DISCOVERY__: - logging.trace("Became a slave agent (bound to port %d)" % self.socket.getsockname()[1]) + logging.trace("Became a slave agent (bound to port " + + "%d)" % self.socket.getsockname()[1]) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.input_thread.setName("TCF Locator Receiver") self.timer_thread.setName("TCF Locator Timer") @@ -263,11 +307,14 @@ class LocatorService(locator.LocatorService): self.input_thread.start() self.timer_thread.start() self.dns_lookup_thread.start() + class LocatorListener(locator.LocatorListener): def peerAdded(self, peer): service._sendPeerInfo(peer, None, 0) + def peerChanged(self, peer): service._sendPeerInfo(peer, None, 0) + self.listeners.append(LocatorListener()) self.__refreshSubNetList() self.__sendPeersRequest(None, 0) @@ -292,12 +339,15 @@ class LocatorService(locator.LocatorService): peer_id = fromJSONSequence(data)[0] _peer = self.peers.get(peer_id) if _peer is None: - error = self.__makeErrorReport(errors.TCF_ERROR_UNKNOWN_PEER, "Unknown peer ID") + errNum = errors.TCF_ERROR_UNKNOWN_PEER + error = self.__makeErrorReport(errNum, "Unknown peer ID") channel.sendResult(token, toJSONSequence((error,))) return channel.sendResult(token, toJSONSequence((None,))) if isinstance(_peer, peer.LocalPeer): - channel.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((channel.getLocalServices(),))) + seq = (channel.getLocalServices(),) + channel.sendEvent(protocol.getLocator(), "Hello", + toJSONSequence(seq)) return ChannelProxy(channel, _peer.openChannel()) elif name == "sync": @@ -313,15 +363,19 @@ class LocatorService(locator.LocatorService): channel.terminate(x) def _log(self, msg, x): - if not self._alive: return - # Don't report same error multiple times to avoid filling up the log file. + if not self._alive: + return + # Don't report same error multiple times to avoid filling up the log + # file. with self._error_log_lock: - if msg in self.error_log: return + if msg in self.error_log: + return self.error_log.add(msg) protocol.log(msg, x) def __getInetAddress(self, host): - if not host: return None + if not host: + return None with self._addr_cache_lock: i = self.addr_cache.get(host) if i is None: @@ -335,7 +389,8 @@ class LocatorService(locator.LocatorService): pass i.time_stamp = int(time.time() * 1000) else: - # socket.gethostbyname() can cause long delay - delegate to background thread + # socket.gethostbyname() can cause long delay - delegate to + # background thread LocatorService.addr_request = True self._addr_cache_lock.notify() self.addr_cache[host] = i @@ -359,15 +414,18 @@ class LocatorService(locator.LocatorService): for p in self.peers.values(): if isinstance(p, peer.RemotePeer): if p.getLastUpdateTime() + locator.DATA_RETENTION_PERIOD < tm: - if stale_peers == None: stale_peers = [] + if stale_peers == None: + stale_peers = [] stale_peers.append(p) if stale_peers is not None: - for p in stale_peers: p.dispose() + for p in stale_peers: + p.dispose() # Try to become a master port = self.socket.getsockname()[1] + period = locator.DATA_RETENTION_PERIOD / 2 if port != DISCOVEY_PORT and \ - self.last_master_packet_time + locator.DATA_RETENTION_PERIOD / 2 <= tm: + self.last_master_packet_time + period <= tm: s0 = self.socket s1 = None try: @@ -387,7 +445,8 @@ class LocatorService(locator.LocatorService): def __addSlave(self, addr, port, timestamp, time_now): for s in self.slaves: if s.port == port and s.address == addr: - if s.last_packet_time < timestamp: s.last_packet_time = timestamp + if s.last_packet_time < timestamp: + s.last_packet_time = timestamp return s s = Slave(addr, port) s.last_packet_time = timestamp @@ -404,16 +463,19 @@ class LocatorService(locator.LocatorService): except Exception as x: self._log("Cannot get list of network interfaces", x) for s in tuple(self.subnets): - if s in subNetSet: continue + if s in subNetSet: + continue self.subnets.remove(s) for s in subNetSet: - if s in self.subnets: continue + if s in self.subnets: + continue self.subnets.add(s) if __TRACE_DISCOVERY__: buf = cStringIO.StringIO() buf.write("Refreshed subnet list:") for subnet in self.subnets: - buf.write("\n\t* address=%s, broadcast=%s" % (subnet.address, subnet.broadcast)) + buf.write("\n\t* address=%s, broadcast=%s" % \ + (subnet.address, subnet.broadcast)) logging.trace(buf.getvalue()) def __getAllIpAddresses (self) : @@ -427,7 +489,7 @@ class LocatorService(locator.LocatorService): ifcfg = struct.unpack \ ( 'iL', - fcntl.ioctl (s.fileno(), 0x8912, # @UndefinedVariable + fcntl.ioctl (s.fileno(), 0x8912, # @UndefinedVariable struct.pack ('iL', nBytes, names.buffer_info ()[0])) )[0] @@ -475,7 +537,8 @@ class LocatorService(locator.LocatorService): for address in self.addr_list: rawaddr = socket.inet_aton(address) - if len(rawaddr) != 4: continue + if len(rawaddr) != 4: + continue rawaddr = rawaddr[:3] + '\xFF' broadcast = socket.inet_ntoa(rawaddr) _set.add(SubNet(24, InetAddress(hostname, address), @@ -500,10 +563,16 @@ class LocatorService(locator.LocatorService): addr = subnet.broadcast port = DISCOVEY_PORT for slave in self.slaves: - self.__sendDatagramPacket(subnet, size, slave.address, slave.port) - if not subnet.contains(addr): return False - if port == self.socket.getsockname()[1] and addr == subnet.address: return False - self.socket.sendto(str(self.out_buf[:size]), (addr.getHostAddress(), port)) + self.__sendDatagramPacket(subnet, size, slave.address, + slave.port) + if not subnet.contains(addr): + return False + if port == self.socket.getsockname()[1] and \ + addr == subnet.address: + return False + + self.socket.sendto(str(self.out_buf[:size]), + (addr.getHostAddress(), port)) if __TRACE_DISCOVERY__: attrs = None @@ -513,7 +582,9 @@ class LocatorService(locator.LocatorService): attrs = self.__parseIDs(self.out_buf, size) elif self.out_buf[4] == locator.CONF_PEERS_REMOVED: attrs = self.__parseIDs(self.out_buf, size) - self.__traceDiscoveryPacket(False, self.packetTypes[self.out_buf[4]], attrs, addr, port) + self.__traceDiscoveryPacket(False, + self.packetTypes[self.out_buf[4]], + attrs, addr, port) except Exception as x: self._log("Cannot send datagram packet to %s" % addr, x) return False @@ -534,13 +605,17 @@ class LocatorService(locator.LocatorService): i = 0 while i < l: i0 = i - while i < l and s[i] != '=' and s[i] != '\0': i += 1 + while i < l and s[i] != '=' and s[i] != '\0': + i += 1 i1 = i - if i < l and s[i] == '=': i += 1 + if i < l and s[i] == '=': + i += 1 i2 = i - while i < l and s[i] != '\0': i += 1 + while i < l and s[i] != '\0': + i += 1 i3 = i - if i < l and s[i] == '\0': i += 1 + if i < l and s[i] == '\0': + i += 1 key = s[i0:i1] val = s[i2:i3] attrs[key] = val @@ -548,8 +623,9 @@ class LocatorService(locator.LocatorService): def __parseIDs(self, data, size): """ - Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet data. - + Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet + data. + @param data - the packet data @param size - the packet size @return a map containing the IDs @@ -561,12 +637,14 @@ class LocatorService(locator.LocatorService): i = 0 while i < l: i0 = i - while i < l and s[i] != '\0': i += 1 + while i < l and s[i] != '\0': + i += 1 if i > i0: _id = s[i0:i] attrs[str(cnt)] = _id cnt += 1 - while i < l and s[i] == '\0': i += 1 + while i < l and s[i] == '\0': + i += 1 return attrs def __sendPeersRequest(self, addr, port): @@ -577,17 +655,23 @@ class LocatorService(locator.LocatorService): def _sendPeerInfo(self, _peer, addr, port): attrs = _peer.getAttributes() peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST)) - if peer_addr is None: return - if attrs.get(peer.ATTR_IP_PORT) is None: return + if peer_addr is None: + return + if attrs.get(peer.ATTR_IP_PORT) is None: + return self.out_buf[4] = locator.CONF_PEER_INFO i = 8 for subnet in self.subnets: if isinstance(_peer, peer.RemotePeer): - if self.socket.getsockname()[1] != DISCOVEY_PORT: return - if not subnet.address == self.loopback_addr and not subnet.address == peer_addr: continue + if self.socket.getsockname()[1] != DISCOVEY_PORT: + return + if not subnet.address == self.loopback_addr and \ + not subnet.address == peer_addr: + continue if not subnet.address == self.loopback_addr: - if not subnet.contains(peer_addr): continue + if not subnet.contains(peer_addr): + continue if i == 8: sb = cStringIO.StringIO() for key in attrs.keys(): @@ -596,21 +680,27 @@ class LocatorService(locator.LocatorService): sb.write(attrs.get(key)) sb.write('\0') bt = self.__getUTF8Bytes(sb.getvalue()) - if i + len(bt) > len(self.out_buf): return + if i + len(bt) > len(self.out_buf): + return self.out_buf[i:i + len(bt)] = bt i += len(bt) - if self.__sendDatagramPacket(subnet, i, addr, port): subnet.send_all_ok = True + if self.__sendDatagramPacket(subnet, i, addr, port): + subnet.send_all_ok = True def __sendEmptyPacket(self, addr, port): self.out_buf[4] = locator.CONF_SLAVES_INFO for subnet in self.subnets: - if subnet.send_all_ok: continue + if subnet.send_all_ok: + continue self.__sendDatagramPacket(subnet, 8, addr, port) def __sendAll(self, addr, port, sl, tm): - for subnet in self.subnets: subnet.send_all_ok = False - for peer in self.peers.values(): self._sendPeerInfo(peer, addr, port) - if addr is not None and sl is not None and sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm: + for subnet in self.subnets: + subnet.send_all_ok = False + for peer in self.peers.values(): + self._sendPeerInfo(peer, addr, port) + if addr is not None and sl is not None and \ + sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm: self.__sendSlavesInfo(addr, port, tm) self.__sendEmptyPacket(addr, port) @@ -620,10 +710,12 @@ class LocatorService(locator.LocatorService): def __sendSlaveInfo(self, x, tm): ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm - if ttl <= 0: return + if ttl <= 0: + return self.out_buf[4] = locator.CONF_SLAVES_INFO for subnet in self.subnets: - if not subnet.contains(x.address): continue + if not subnet.contains(x.address): + continue i = 8 s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress()) bt = self.__getUTF8Bytes(s) @@ -632,23 +724,30 @@ class LocatorService(locator.LocatorService): self.out_buf[i] = 0 i += 1 for y in self.slaves: - if not subnet.contains(y.address): continue - if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: continue + if not subnet.contains(y.address): + continue + if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: + continue self.__sendDatagramPacket(subnet, i, y.address, y.port) def __sendSlavesInfo(self, addr, port, tm): self.out_buf[4] = locator.CONF_SLAVES_INFO for subnet in self.subnets: - if not subnet.contains(addr): continue + if not subnet.contains(addr): + continue i = 8 for x in self.slaves: ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm - if ttl <= 0: return - if x.port == port and x.address == addr: continue + if ttl <= 0: + return + if x.port == port and x.address == addr: + continue if not subnet.address == self.loopback_addr: - if not subnet.contains(x.address): continue + if not subnet.contains(x.address): + continue subnet.send_all_ok = True - s = "%d:%s:%s" % (x.last_packet_time, x.port, x.address.getHostAddress()) + s = "%d:%s:%s" % \ + (x.last_packet_time, x.port, x.address.getHostAddress()) bt = self.__getUTF8Bytes(s) if i > 8 and i + len(bt) >= PREF_PACKET_SIZE: self.__sendDatagramPacket(subnet, i, addr, port) @@ -657,12 +756,15 @@ class LocatorService(locator.LocatorService): i += len(bt) self.out_buf[i] = 0 i += 1 - if i > 8: self.__sendDatagramPacket(subnet, i, addr, port) + if i > 8: + self.__sendDatagramPacket(subnet, i, addr, port) def __isRemote(self, address, port): - if port != self.socket.getsockname()[1]: return True + if port != self.socket.getsockname()[1]: + return True for s in self.subnets: - if s.address == address: return False + if s.address == address: + return False return True def __handleDatagramPacket(self, p): @@ -670,11 +772,16 @@ class LocatorService(locator.LocatorService): tm = int(time.time() * 1000) buf = p.getData() length = p.getLength() - if length < 8: return - if buf[0] != 'T': return - if buf[1] != 'C': return - if buf[2] != 'F': return - if buf[3] != locator.CONF_VERSION: return + if length < 8: + return + if buf[0] != 'T': + return + if buf[1] != 'C': + return + if buf[2] != 'F': + return + if buf[3] != locator.CONF_VERSION: + return remote_port = p.getPort() remote_address = p.getAddress() if self.__isRemote(remote_address, remote_port): @@ -683,7 +790,8 @@ class LocatorService(locator.LocatorService): else: sl = None if remote_port != DISCOVEY_PORT: - sl = self.__addSlave(remote_address, remote_port, tm, tm) + sl = self.__addSlave(remote_address, remote_port, tm, + tm) code = ord(buf[4]) if code == locator.CONF_PEER_INFO: self.__handlePeerInfoPacket(p) @@ -694,24 +802,32 @@ class LocatorService(locator.LocatorService): elif code == locator.CONF_REQ_SLAVES: self.__handleReqSlavesPacket(p, sl, tm) for subnet in self.subnets: - if not subnet.contains(remote_address): continue + if not subnet.contains(remote_address): + continue delay = locator.DATA_RETENTION_PERIOD / 3 - if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32 - elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2 + if remote_port != DISCOVEY_PORT: + delay = locator.DATA_RETENTION_PERIOD / 32 + elif subnet.address != remote_address: + delay = locator.DATA_RETENTION_PERIOD / 2 if subnet.last_slaves_req_time + delay <= tm: - self.__sendSlavesRequest(subnet, remote_address, remote_port) + self.__sendSlavesRequest(subnet, remote_address, + remote_port) subnet.last_slaves_req_time = tm - if subnet.address == remote_address and remote_port == DISCOVEY_PORT: + if subnet.address == remote_address and \ + remote_port == DISCOVEY_PORT: self.last_master_packet_time = tm except Exception as x: - self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + self._log("Invalid datagram packet received from %s/%s" % \ + (p.getAddress(), p.getPort()), x) def __handlePeerInfoPacket(self, p): try: attrs = self.__parsePeerAttributes(p.getData(), p.getLength()) - if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", attrs, p) + if __TRACE_DISCOVERY__: + self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", attrs, p) _id = attrs.get(peer.ATTR_ID) - if _id is None: raise RuntimeError("Invalid peer info: no ID") + if _id is None: + raise RuntimeError("Invalid peer info: no ID") ok = True host = attrs.get(peer.ATTR_IP_HOST) if host is not None: @@ -729,7 +845,8 @@ class LocatorService(locator.LocatorService): elif _peer is None: peer.RemotePeer(attrs) except Exception as x: - self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + self._log("Invalid datagram packet received from %s/%s" % \ + (p.getAddress(), p.getPort()), x) def __handleReqInfoPacket(self, p, sl, tm): if __TRACE_DISCOVERY__: @@ -739,20 +856,26 @@ class LocatorService(locator.LocatorService): def __handleSlavesInfoPacket(self, p, time_now): try: attrs = self.__parseIDs(p.getData(), p.getLength()) - if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", attrs, p) + if __TRACE_DISCOVERY__: + self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", attrs, p) for s in attrs.values(): i = 0 l = len(s) time0 = i - while i < l and s[i] != ':' and s[i] != '\0': i += 1 + while i < l and s[i] != ':' and s[i] != '\0': + i += 1 time1 = i - if i < l and s[i] == ':': i += 1 + if i < l and s[i] == ':': + i += 1 port0 = i - while i < l and s[i] != ':' and s[i] != '\0': i += 1 + while i < l and s[i] != ':' and s[i] != '\0': + i += 1 port1 = i - if i < l and s[i] == ':': i += 1 + if i < l and s[i] == ':': + i += 1 host0 = i - while i < l and s[i] != '\0': i += 1 + while i < l and s[i] != '\0': + i += 1 host1 = i port = int(s[port0:port1]) timestamp = s[time0:time1] @@ -760,44 +883,60 @@ class LocatorService(locator.LocatorService): if port != DISCOVEY_PORT: addr = self.__getInetAddress(host) if addr is not None: - delta = 10006030 # 30 minutes + delta = 10006030 # 30 minutes if len(timestamp) > 0: time_val = int(timestamp) else: time_val = time_now if time_val < 3600000: # Time stamp is "time to live" in milliseconds - time_val = time_now + time_val / 1000 - locator.DATA_RETENTION_PERIOD + time_val = time_now + time_val / 1000 - \ + locator.DATA_RETENTION_PERIOD elif time_val < time_now / 1000 + 50000000: # Time stamp is in seconds time_val = 1000 else: # Time stamp is in milliseconds pass - if time_val < time_now - delta or time_val > time_now + delta: + if time_val < time_now - delta or \ + time_val > time_now + delta: msg = "Invalid slave info timestamp: %s -> %s" % ( - timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val / 1000.))) - self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), Exception(msg)) - time_val = time_now - locator.DATA_RETENTION_PERIOD / 2 + timestamp, + time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(time_val / + 1000.))) + + self._log("Invalid datagram packet received " + + "from %s/%s" % ( + p.getAddress(), p.getPort()), + Exception(msg)) + time_val = time_now - \ + locator.DATA_RETENTION_PERIOD / 2 self.__addSlave(addr, port, time_val, time_now) except Exception as x: - self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + self._log("Invalid datagram packet received from " + + "%s/%s" % (p.getAddress(), p.getPort()), x) def __handleReqSlavesPacket(self, p, sl, tm): - if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p) - if sl is not None: sl.last_req_slaves_time = tm + if __TRACE_DISCOVERY__: + self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p) + if sl is not None: + sl.last_req_slaves_time = tm self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm) def __handlePeerRemovedPacket(self, p): try: attrs = self.__parseIDs(p.getData(), p.getLength()) - if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", attrs, p) + if __TRACE_DISCOVERY__: + self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", attrs, + p) for _id in attrs.values(): _peer = self.peers.get(_id) - if isinstance(_peer, peer.RemotePeer): _peer.dispose() + if isinstance(_peer, peer.RemotePeer): + _peer.dispose() except Exception as x: - self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) - + self._log("Invalid datagram packet received from %s/%s" % ( + p.getAddress(), p.getPort()), x) @classmethod def getLocator(cls): @@ -823,19 +962,21 @@ class LocatorService(locator.LocatorService): self.listeners.remove(listener) @classmethod - def __traceDiscoveryPacket(cls, received, packet_type, attrs, addr, port=None): + def __traceDiscoveryPacket(cls, received, packet_type, attrs, addr, + port=None): """ Log that a TCF Discovery packet has be sent or received. The trace is - sent to stdout. This should be called only if the tracing has been turned - on. + sent to stdout. This should be called only if the tracing has been + turned on. @param received True if the packet was sent, otherwise it was received @param packet_type - a string specifying the type of packet, e.g., "CONF_PEER_INFO" + a string specifying the type of packet, e.g., + "CONF_PEER_INFO" @param attrs - a set of attributes relevant to the type of packet (typically - a peer's attributes) + a set of attributes relevant to the type of packet + (typically a peer's attributes) @param addr the network address the packet is being sent to @param port @@ -855,6 +996,7 @@ class LocatorService(locator.LocatorService): buf.write("\n\t%s=%s" % (key, value)) logging.trace(buf.getvalue()) + class LocatorServiceProvider(services.ServiceProvider): def getLocalService(self, _channel): class CommandServer(channel.CommandServer): |