Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/services/local/LocatorService.py')
-rw-r--r--python/src/tcf/services/local/LocatorService.py388
1 files changed, 265 insertions, 123 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index dffd3556a..5587d60b8 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -1,5 +1,5 @@
# *****************************************************************************
-# * Copyright (c) 2011, 2012 Wind River Systems, Inc. and others.
+# * Copyright (c) 2011, 2013 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -20,11 +20,11 @@ import threading
import time
import socket
import cStringIO
-from tcf.services import locator
-from tcf.util import logging
-from tcf.channel import fromJSONSequence, toJSONSequence
-from tcf.channel.ChannelProxy import ChannelProxy
-from tcf import protocol, services, channel, peer, errors
+from .. import locator
+from ...util import logging
+from ...channel import fromJSONSequence, toJSONSequence
+from ...channel.ChannelProxy import ChannelProxy
+from ... import protocol, services, channel, peer, errors
# Flag indicating whether tracing of the the discovery activity is enabled.
__TRACE_DISCOVERY__ = False
@@ -38,10 +38,12 @@ class SubNet(object):
self.last_slaves_req_time = 0
def contains(self, addr):
- if addr is None or self.address is None: return False
+ if addr is None or self.address is None:
+ return False
a1 = addr.getAddress()
a2 = self.address.getAddress()
- if len(a1) != len(a2): return False
+ if len(a1) != len(a2):
+ return False
i = 0
if self.prefix_length <= len(a1) * 8:
l = self.prefix_length
@@ -49,87 +51,112 @@ class SubNet(object):
l = len(a1) * 8
while i + 8 <= l:
n = i / 8
- if a1[n] != a2[n]: return False
+ if a1[n] != a2[n]:
+ return False
i += 8
while i < l:
n = i / 8
m = 1 << (7 - i % 8)
- if (a1[n] & m) != (a2[n] & m): return False
+ if (a1[n] & m) != (a2[n] & m):
+ return False
i += 1
return True
+
def __eq__(self, o):
- if not isinstance(o, SubNet): return False
+ if not isinstance(o, SubNet):
+ return False
return self.prefix_length == o.prefix_length and \
self.broadcast == o.broadcast and \
self.address == o.address
+
def __hash__(self):
return hash(self.address)
+
def __str__(self):
return "%s/%d" % (self.address.getHostAddress(), self.prefix_length)
+
class Slave(object):
# Time of last packet receiver from self slave
last_packet_time = 0
- #Time of last REQ_SLAVES packet received from self slave
+ # Time of last REQ_SLAVES packet received from self slave
last_req_slaves_time = 0
def __init__(self, address, port):
self.address = address
self.port = port
+
def __str__(self):
return "%s/%d" % (self.address.getHostAddress(), self.port)
+
class AddressCacheItem(object):
address = None
time_stamp = 0
used = False
+
def __init__(self, host):
self.host = host
+
class InetAddress(object):
"Mimicking Java InetAddress class"
def __init__(self, host, addr):
self.host = host
self.addr = addr
+
def getAddress(self):
return socket.inet_aton(self.addr)
+
def getHostAddress(self):
return self.addr
+
def __eq__(self, other):
- if not isinstance(other, InetAddress): return False
+ if not isinstance(other, InetAddress):
+ return False
return self.addr == other.addr
+
def __str__(self):
return "%s/%s" % (self.host or "", self.addr)
+
class InputPacket(object):
"Wrapper for UDP packet data."
def __init__(self, data, addr, port):
self.data = data
self.addr = addr
self.port = port
+
def getLength(self):
return len(self.data)
+
def getData(self):
return self.data
+
def getPort(self):
return self.port
+
def getAddress(self):
return self.addr
+
def __str__(self):
- return "[address=%s,port=%d,data=\"%s\"]" % (self.getAddress(), self.getPort(), self.data)
+ return "[address=%s,port=%d,data=\"%s\"]" % \
+ (self.getAddress(), self.getPort(), self.data)
DISCOVEY_PORT = 1534
MAX_PACKET_SIZE = 9000 - 40 - 8
PREF_PACKET_SIZE = 1500 - 40 - 8
# TODO: research usage of DNS-SD (DNS Service Discovery) to discover TCF peers
+
+
class LocatorService(locator.LocatorService):
locator = None
- peers = {} # str->Peer
- listeners = [] # list of LocatorListener
- error_log = set() # set of str
+ peers = {} # str->Peer
+ listeners = [] # list of LocatorListener
+ error_log = set() # set of str
addr_list = []
- addr_cache = {} # str->AddressCacheItem
+ addr_cache = {} # str->AddressCacheItem
addr_request = False
local_peer = None
last_master_packet_time = 0
@@ -159,7 +186,8 @@ class LocatorService(locator.LocatorService):
LocatorService.local_peer = peer.LocalPeer()
def _startup(self):
- if self._alive: return
+ if self._alive:
+ return
self._alive = True
self._addr_cache_lock = threading.Condition()
self.subnets = set()
@@ -167,10 +195,12 @@ class LocatorService(locator.LocatorService):
self.inp_buf = bytearray(MAX_PACKET_SIZE)
self.out_buf = bytearray(MAX_PACKET_SIZE)
service = self
+
class TimerThread(threading.Thread):
def __init__(self, _callable):
self._callable = _callable
super(TimerThread, self).__init__()
+
def run(self):
while service._alive:
try:
@@ -180,21 +210,25 @@ class LocatorService(locator.LocatorService):
# TCF event dispatch is shut down
return
except Exception as x:
- service._log("Unhandled exception in TCF discovery timer thread", x)
+ service._log("Unhandled exception in TCF discovery " +
+ "timer thread", x)
self.timer_thread = TimerThread(self.__refresh_timer)
+
class DNSLookupThread(threading.Thread):
def run(self):
while service._alive:
try:
itemSet = None
with service._addr_cache_lock:
+ period = locator.DATA_RETENTION_PERIOD
if not LocatorService.addr_request:
- service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
+ service._addr_cache_lock.wait(period)
msec = int(time.time() * 1000)
for host, a in LocatorService.addr_cache.items():
- if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec:
+ if a.time_stamp + period * 10 < msec:
if a.used:
- if itemSet is None: itemSet = set()
+ if itemSet is None:
+ itemSet = set()
itemSet.add(a)
else:
del LocatorService.addr_cache[host]
@@ -214,32 +248,40 @@ class LocatorService(locator.LocatorService):
a.time_stamp = msec
a.used = False
except Exception as x:
- service._log("Unhandled exception in TCF discovery DNS lookup thread", x)
+ service._log("Unhandled exception in TCF discovery " +
+ "DNS lookup thread", x)
self.dns_lookup_thread = DNSLookupThread()
+
class InputThread(threading.Thread):
def __init__(self, _callable):
self._callable = _callable
super(InputThread, self).__init__()
+
def run(self):
try:
while service._alive:
sock = service.socket
try:
data, addr = sock.recvfrom(MAX_PACKET_SIZE)
- p = InputPacket(data, InetAddress(None, addr[0]), addr[1])
+ p = InputPacket(data, InetAddress(None, addr[0]),
+ addr[1])
protocol.invokeAndWait(self._callable, p)
except RuntimeError:
# TCF event dispatch is shutdown
return
except socket.error as x:
- if sock != service.socket: continue
+ if sock != service.socket:
+ continue
# frequent error on windows, unknown reason
- if x.errno == 10054: continue
+ if x.errno == 10054:
+ continue
port = sock.getsockname()[1]
- service._log("Cannot read from datagram socket at port %d" % port, x)
+ service._log("Cannot read from datagram socket " +
+ "at port %d" % port, x)
time.sleep(2)
except Exception as x:
- service._log("Unhandled exception in socket reading thread", x)
+ service._log("Unhandled exception in socket reading " +
+ "thread", x)
self.input_thread = InputThread(self.__handleDatagramPacket)
try:
self.loopback_addr = InetAddress(None, "127.0.0.1")
@@ -248,11 +290,13 @@ class LocatorService(locator.LocatorService):
try:
self.socket.bind(('', DISCOVEY_PORT))
if __TRACE_DISCOVERY__:
- logging.trace("Became the master agent (bound to port %d)" % self.socket.getsockname()[1])
+ logging.trace("Became the master agent (bound to port " +
+ "%d)" % self.socket.getsockname()[1])
except socket.error as x:
self.socket.bind(('', 0))
if __TRACE_DISCOVERY__:
- logging.trace("Became a slave agent (bound to port %d)" % self.socket.getsockname()[1])
+ logging.trace("Became a slave agent (bound to port " +
+ "%d)" % self.socket.getsockname()[1])
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.input_thread.setName("TCF Locator Receiver")
self.timer_thread.setName("TCF Locator Timer")
@@ -263,11 +307,14 @@ class LocatorService(locator.LocatorService):
self.input_thread.start()
self.timer_thread.start()
self.dns_lookup_thread.start()
+
class LocatorListener(locator.LocatorListener):
def peerAdded(self, peer):
service._sendPeerInfo(peer, None, 0)
+
def peerChanged(self, peer):
service._sendPeerInfo(peer, None, 0)
+
self.listeners.append(LocatorListener())
self.__refreshSubNetList()
self.__sendPeersRequest(None, 0)
@@ -292,12 +339,15 @@ class LocatorService(locator.LocatorService):
peer_id = fromJSONSequence(data)[0]
_peer = self.peers.get(peer_id)
if _peer is None:
- error = self.__makeErrorReport(errors.TCF_ERROR_UNKNOWN_PEER, "Unknown peer ID")
+ errNum = errors.TCF_ERROR_UNKNOWN_PEER
+ error = self.__makeErrorReport(errNum, "Unknown peer ID")
channel.sendResult(token, toJSONSequence((error,)))
return
channel.sendResult(token, toJSONSequence((None,)))
if isinstance(_peer, peer.LocalPeer):
- channel.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((channel.getLocalServices(),)))
+ seq = (channel.getLocalServices(),)
+ channel.sendEvent(protocol.getLocator(), "Hello",
+ toJSONSequence(seq))
return
ChannelProxy(channel, _peer.openChannel())
elif name == "sync":
@@ -313,15 +363,19 @@ class LocatorService(locator.LocatorService):
channel.terminate(x)
def _log(self, msg, x):
- if not self._alive: return
- # Don't report same error multiple times to avoid filling up the log file.
+ if not self._alive:
+ return
+ # Don't report same error multiple times to avoid filling up the log
+ # file.
with self._error_log_lock:
- if msg in self.error_log: return
+ if msg in self.error_log:
+ return
self.error_log.add(msg)
protocol.log(msg, x)
def __getInetAddress(self, host):
- if not host: return None
+ if not host:
+ return None
with self._addr_cache_lock:
i = self.addr_cache.get(host)
if i is None:
@@ -335,7 +389,8 @@ class LocatorService(locator.LocatorService):
pass
i.time_stamp = int(time.time() * 1000)
else:
- # socket.gethostbyname() can cause long delay - delegate to background thread
+ # socket.gethostbyname() can cause long delay - delegate to
+ # background thread
LocatorService.addr_request = True
self._addr_cache_lock.notify()
self.addr_cache[host] = i
@@ -359,15 +414,18 @@ class LocatorService(locator.LocatorService):
for p in self.peers.values():
if isinstance(p, peer.RemotePeer):
if p.getLastUpdateTime() + locator.DATA_RETENTION_PERIOD < tm:
- if stale_peers == None: stale_peers = []
+ if stale_peers == None:
+ stale_peers = []
stale_peers.append(p)
if stale_peers is not None:
- for p in stale_peers: p.dispose()
+ for p in stale_peers:
+ p.dispose()
# Try to become a master
port = self.socket.getsockname()[1]
+ period = locator.DATA_RETENTION_PERIOD / 2
if port != DISCOVEY_PORT and \
- self.last_master_packet_time + locator.DATA_RETENTION_PERIOD / 2 <= tm:
+ self.last_master_packet_time + period <= tm:
s0 = self.socket
s1 = None
try:
@@ -387,7 +445,8 @@ class LocatorService(locator.LocatorService):
def __addSlave(self, addr, port, timestamp, time_now):
for s in self.slaves:
if s.port == port and s.address == addr:
- if s.last_packet_time < timestamp: s.last_packet_time = timestamp
+ if s.last_packet_time < timestamp:
+ s.last_packet_time = timestamp
return s
s = Slave(addr, port)
s.last_packet_time = timestamp
@@ -404,16 +463,19 @@ class LocatorService(locator.LocatorService):
except Exception as x:
self._log("Cannot get list of network interfaces", x)
for s in tuple(self.subnets):
- if s in subNetSet: continue
+ if s in subNetSet:
+ continue
self.subnets.remove(s)
for s in subNetSet:
- if s in self.subnets: continue
+ if s in self.subnets:
+ continue
self.subnets.add(s)
if __TRACE_DISCOVERY__:
buf = cStringIO.StringIO()
buf.write("Refreshed subnet list:")
for subnet in self.subnets:
- buf.write("\n\t* address=%s, broadcast=%s" % (subnet.address, subnet.broadcast))
+ buf.write("\n\t* address=%s, broadcast=%s" % \
+ (subnet.address, subnet.broadcast))
logging.trace(buf.getvalue())
def __getAllIpAddresses (self) :
@@ -427,7 +489,7 @@ class LocatorService(locator.LocatorService):
ifcfg = struct.unpack \
(
'iL',
- fcntl.ioctl (s.fileno(), 0x8912, # @UndefinedVariable
+ fcntl.ioctl (s.fileno(), 0x8912, # @UndefinedVariable
struct.pack ('iL', nBytes, names.buffer_info ()[0]))
)[0]
@@ -475,7 +537,8 @@ class LocatorService(locator.LocatorService):
for address in self.addr_list:
rawaddr = socket.inet_aton(address)
- if len(rawaddr) != 4: continue
+ if len(rawaddr) != 4:
+ continue
rawaddr = rawaddr[:3] + '\xFF'
broadcast = socket.inet_ntoa(rawaddr)
_set.add(SubNet(24, InetAddress(hostname, address),
@@ -500,10 +563,16 @@ class LocatorService(locator.LocatorService):
addr = subnet.broadcast
port = DISCOVEY_PORT
for slave in self.slaves:
- self.__sendDatagramPacket(subnet, size, slave.address, slave.port)
- if not subnet.contains(addr): return False
- if port == self.socket.getsockname()[1] and addr == subnet.address: return False
- self.socket.sendto(str(self.out_buf[:size]), (addr.getHostAddress(), port))
+ self.__sendDatagramPacket(subnet, size, slave.address,
+ slave.port)
+ if not subnet.contains(addr):
+ return False
+ if port == self.socket.getsockname()[1] and \
+ addr == subnet.address:
+ return False
+
+ self.socket.sendto(str(self.out_buf[:size]),
+ (addr.getHostAddress(), port))
if __TRACE_DISCOVERY__:
attrs = None
@@ -513,7 +582,9 @@ class LocatorService(locator.LocatorService):
attrs = self.__parseIDs(self.out_buf, size)
elif self.out_buf[4] == locator.CONF_PEERS_REMOVED:
attrs = self.__parseIDs(self.out_buf, size)
- self.__traceDiscoveryPacket(False, self.packetTypes[self.out_buf[4]], attrs, addr, port)
+ self.__traceDiscoveryPacket(False,
+ self.packetTypes[self.out_buf[4]],
+ attrs, addr, port)
except Exception as x:
self._log("Cannot send datagram packet to %s" % addr, x)
return False
@@ -534,13 +605,17 @@ class LocatorService(locator.LocatorService):
i = 0
while i < l:
i0 = i
- while i < l and s[i] != '=' and s[i] != '\0': i += 1
+ while i < l and s[i] != '=' and s[i] != '\0':
+ i += 1
i1 = i
- if i < l and s[i] == '=': i += 1
+ if i < l and s[i] == '=':
+ i += 1
i2 = i
- while i < l and s[i] != '\0': i += 1
+ while i < l and s[i] != '\0':
+ i += 1
i3 = i
- if i < l and s[i] == '\0': i += 1
+ if i < l and s[i] == '\0':
+ i += 1
key = s[i0:i1]
val = s[i2:i3]
attrs[key] = val
@@ -548,8 +623,9 @@ class LocatorService(locator.LocatorService):
def __parseIDs(self, data, size):
"""
- Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet data.
-
+ Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet
+ data.
+
@param data - the packet data
@param size - the packet size
@return a map containing the IDs
@@ -561,12 +637,14 @@ class LocatorService(locator.LocatorService):
i = 0
while i < l:
i0 = i
- while i < l and s[i] != '\0': i += 1
+ while i < l and s[i] != '\0':
+ i += 1
if i > i0:
_id = s[i0:i]
attrs[str(cnt)] = _id
cnt += 1
- while i < l and s[i] == '\0': i += 1
+ while i < l and s[i] == '\0':
+ i += 1
return attrs
def __sendPeersRequest(self, addr, port):
@@ -577,17 +655,23 @@ class LocatorService(locator.LocatorService):
def _sendPeerInfo(self, _peer, addr, port):
attrs = _peer.getAttributes()
peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST))
- if peer_addr is None: return
- if attrs.get(peer.ATTR_IP_PORT) is None: return
+ if peer_addr is None:
+ return
+ if attrs.get(peer.ATTR_IP_PORT) is None:
+ return
self.out_buf[4] = locator.CONF_PEER_INFO
i = 8
for subnet in self.subnets:
if isinstance(_peer, peer.RemotePeer):
- if self.socket.getsockname()[1] != DISCOVEY_PORT: return
- if not subnet.address == self.loopback_addr and not subnet.address == peer_addr: continue
+ if self.socket.getsockname()[1] != DISCOVEY_PORT:
+ return
+ if not subnet.address == self.loopback_addr and \
+ not subnet.address == peer_addr:
+ continue
if not subnet.address == self.loopback_addr:
- if not subnet.contains(peer_addr): continue
+ if not subnet.contains(peer_addr):
+ continue
if i == 8:
sb = cStringIO.StringIO()
for key in attrs.keys():
@@ -596,21 +680,27 @@ class LocatorService(locator.LocatorService):
sb.write(attrs.get(key))
sb.write('\0')
bt = self.__getUTF8Bytes(sb.getvalue())
- if i + len(bt) > len(self.out_buf): return
+ if i + len(bt) > len(self.out_buf):
+ return
self.out_buf[i:i + len(bt)] = bt
i += len(bt)
- if self.__sendDatagramPacket(subnet, i, addr, port): subnet.send_all_ok = True
+ if self.__sendDatagramPacket(subnet, i, addr, port):
+ subnet.send_all_ok = True
def __sendEmptyPacket(self, addr, port):
self.out_buf[4] = locator.CONF_SLAVES_INFO
for subnet in self.subnets:
- if subnet.send_all_ok: continue
+ if subnet.send_all_ok:
+ continue
self.__sendDatagramPacket(subnet, 8, addr, port)
def __sendAll(self, addr, port, sl, tm):
- for subnet in self.subnets: subnet.send_all_ok = False
- for peer in self.peers.values(): self._sendPeerInfo(peer, addr, port)
- if addr is not None and sl is not None and sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm:
+ for subnet in self.subnets:
+ subnet.send_all_ok = False
+ for peer in self.peers.values():
+ self._sendPeerInfo(peer, addr, port)
+ if addr is not None and sl is not None and \
+ sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm:
self.__sendSlavesInfo(addr, port, tm)
self.__sendEmptyPacket(addr, port)
@@ -620,10 +710,12 @@ class LocatorService(locator.LocatorService):
def __sendSlaveInfo(self, x, tm):
ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
- if ttl <= 0: return
+ if ttl <= 0:
+ return
self.out_buf[4] = locator.CONF_SLAVES_INFO
for subnet in self.subnets:
- if not subnet.contains(x.address): continue
+ if not subnet.contains(x.address):
+ continue
i = 8
s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress())
bt = self.__getUTF8Bytes(s)
@@ -632,23 +724,30 @@ class LocatorService(locator.LocatorService):
self.out_buf[i] = 0
i += 1
for y in self.slaves:
- if not subnet.contains(y.address): continue
- if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: continue
+ if not subnet.contains(y.address):
+ continue
+ if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm:
+ continue
self.__sendDatagramPacket(subnet, i, y.address, y.port)
def __sendSlavesInfo(self, addr, port, tm):
self.out_buf[4] = locator.CONF_SLAVES_INFO
for subnet in self.subnets:
- if not subnet.contains(addr): continue
+ if not subnet.contains(addr):
+ continue
i = 8
for x in self.slaves:
ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
- if ttl <= 0: return
- if x.port == port and x.address == addr: continue
+ if ttl <= 0:
+ return
+ if x.port == port and x.address == addr:
+ continue
if not subnet.address == self.loopback_addr:
- if not subnet.contains(x.address): continue
+ if not subnet.contains(x.address):
+ continue
subnet.send_all_ok = True
- s = "%d:%s:%s" % (x.last_packet_time, x.port, x.address.getHostAddress())
+ s = "%d:%s:%s" % \
+ (x.last_packet_time, x.port, x.address.getHostAddress())
bt = self.__getUTF8Bytes(s)
if i > 8 and i + len(bt) >= PREF_PACKET_SIZE:
self.__sendDatagramPacket(subnet, i, addr, port)
@@ -657,12 +756,15 @@ class LocatorService(locator.LocatorService):
i += len(bt)
self.out_buf[i] = 0
i += 1
- if i > 8: self.__sendDatagramPacket(subnet, i, addr, port)
+ if i > 8:
+ self.__sendDatagramPacket(subnet, i, addr, port)
def __isRemote(self, address, port):
- if port != self.socket.getsockname()[1]: return True
+ if port != self.socket.getsockname()[1]:
+ return True
for s in self.subnets:
- if s.address == address: return False
+ if s.address == address:
+ return False
return True
def __handleDatagramPacket(self, p):
@@ -670,11 +772,16 @@ class LocatorService(locator.LocatorService):
tm = int(time.time() * 1000)
buf = p.getData()
length = p.getLength()
- if length < 8: return
- if buf[0] != 'T': return
- if buf[1] != 'C': return
- if buf[2] != 'F': return
- if buf[3] != locator.CONF_VERSION: return
+ if length < 8:
+ return
+ if buf[0] != 'T':
+ return
+ if buf[1] != 'C':
+ return
+ if buf[2] != 'F':
+ return
+ if buf[3] != locator.CONF_VERSION:
+ return
remote_port = p.getPort()
remote_address = p.getAddress()
if self.__isRemote(remote_address, remote_port):
@@ -683,7 +790,8 @@ class LocatorService(locator.LocatorService):
else:
sl = None
if remote_port != DISCOVEY_PORT:
- sl = self.__addSlave(remote_address, remote_port, tm, tm)
+ sl = self.__addSlave(remote_address, remote_port, tm,
+ tm)
code = ord(buf[4])
if code == locator.CONF_PEER_INFO:
self.__handlePeerInfoPacket(p)
@@ -694,24 +802,32 @@ class LocatorService(locator.LocatorService):
elif code == locator.CONF_REQ_SLAVES:
self.__handleReqSlavesPacket(p, sl, tm)
for subnet in self.subnets:
- if not subnet.contains(remote_address): continue
+ if not subnet.contains(remote_address):
+ continue
delay = locator.DATA_RETENTION_PERIOD / 3
- if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32
- elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2
+ if remote_port != DISCOVEY_PORT:
+ delay = locator.DATA_RETENTION_PERIOD / 32
+ elif subnet.address != remote_address:
+ delay = locator.DATA_RETENTION_PERIOD / 2
if subnet.last_slaves_req_time + delay <= tm:
- self.__sendSlavesRequest(subnet, remote_address, remote_port)
+ self.__sendSlavesRequest(subnet, remote_address,
+ remote_port)
subnet.last_slaves_req_time = tm
- if subnet.address == remote_address and remote_port == DISCOVEY_PORT:
+ if subnet.address == remote_address and \
+ remote_port == DISCOVEY_PORT:
self.last_master_packet_time = tm
except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
+ self._log("Invalid datagram packet received from %s/%s" % \
+ (p.getAddress(), p.getPort()), x)
def __handlePeerInfoPacket(self, p):
try:
attrs = self.__parsePeerAttributes(p.getData(), p.getLength())
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", attrs, p)
+ if __TRACE_DISCOVERY__:
+ self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", attrs, p)
_id = attrs.get(peer.ATTR_ID)
- if _id is None: raise RuntimeError("Invalid peer info: no ID")
+ if _id is None:
+ raise RuntimeError("Invalid peer info: no ID")
ok = True
host = attrs.get(peer.ATTR_IP_HOST)
if host is not None:
@@ -729,7 +845,8 @@ class LocatorService(locator.LocatorService):
elif _peer is None:
peer.RemotePeer(attrs)
except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
+ self._log("Invalid datagram packet received from %s/%s" % \
+ (p.getAddress(), p.getPort()), x)
def __handleReqInfoPacket(self, p, sl, tm):
if __TRACE_DISCOVERY__:
@@ -739,20 +856,26 @@ class LocatorService(locator.LocatorService):
def __handleSlavesInfoPacket(self, p, time_now):
try:
attrs = self.__parseIDs(p.getData(), p.getLength())
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", attrs, p)
+ if __TRACE_DISCOVERY__:
+ self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", attrs, p)
for s in attrs.values():
i = 0
l = len(s)
time0 = i
- while i < l and s[i] != ':' and s[i] != '\0': i += 1
+ while i < l and s[i] != ':' and s[i] != '\0':
+ i += 1
time1 = i
- if i < l and s[i] == ':': i += 1
+ if i < l and s[i] == ':':
+ i += 1
port0 = i
- while i < l and s[i] != ':' and s[i] != '\0': i += 1
+ while i < l and s[i] != ':' and s[i] != '\0':
+ i += 1
port1 = i
- if i < l and s[i] == ':': i += 1
+ if i < l and s[i] == ':':
+ i += 1
host0 = i
- while i < l and s[i] != '\0': i += 1
+ while i < l and s[i] != '\0':
+ i += 1
host1 = i
port = int(s[port0:port1])
timestamp = s[time0:time1]
@@ -760,44 +883,60 @@ class LocatorService(locator.LocatorService):
if port != DISCOVEY_PORT:
addr = self.__getInetAddress(host)
if addr is not None:
- delta = 10006030 # 30 minutes
+ delta = 10006030 # 30 minutes
if len(timestamp) > 0:
time_val = int(timestamp)
else:
time_val = time_now
if time_val < 3600000:
# Time stamp is "time to live" in milliseconds
- time_val = time_now + time_val / 1000 - locator.DATA_RETENTION_PERIOD
+ time_val = time_now + time_val / 1000 - \
+ locator.DATA_RETENTION_PERIOD
elif time_val < time_now / 1000 + 50000000:
# Time stamp is in seconds
time_val = 1000
else:
# Time stamp is in milliseconds
pass
- if time_val < time_now - delta or time_val > time_now + delta:
+ if time_val < time_now - delta or \
+ time_val > time_now + delta:
msg = "Invalid slave info timestamp: %s -> %s" % (
- timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val / 1000.)))
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), Exception(msg))
- time_val = time_now - locator.DATA_RETENTION_PERIOD / 2
+ timestamp,
+ time.strftime("%Y-%m-%d %H:%M:%S",
+ time.localtime(time_val /
+ 1000.)))
+
+ self._log("Invalid datagram packet received " +
+ "from %s/%s" % (
+ p.getAddress(), p.getPort()),
+ Exception(msg))
+ time_val = time_now - \
+ locator.DATA_RETENTION_PERIOD / 2
self.__addSlave(addr, port, time_val, time_now)
except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
+ self._log("Invalid datagram packet received from " +
+ "%s/%s" % (p.getAddress(), p.getPort()), x)
def __handleReqSlavesPacket(self, p, sl, tm):
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
- if sl is not None: sl.last_req_slaves_time = tm
+ if __TRACE_DISCOVERY__:
+ self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
+ if sl is not None:
+ sl.last_req_slaves_time = tm
self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm)
def __handlePeerRemovedPacket(self, p):
try:
attrs = self.__parseIDs(p.getData(), p.getLength())
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", attrs, p)
+ if __TRACE_DISCOVERY__:
+ self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", attrs,
+ p)
for _id in attrs.values():
_peer = self.peers.get(_id)
- if isinstance(_peer, peer.RemotePeer): _peer.dispose()
+ if isinstance(_peer, peer.RemotePeer):
+ _peer.dispose()
except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
-
+ self._log("Invalid datagram packet received from %s/%s" % (
+ p.getAddress(), p.getPort()), x)
@classmethod
def getLocator(cls):
@@ -823,19 +962,21 @@ class LocatorService(locator.LocatorService):
self.listeners.remove(listener)
@classmethod
- def __traceDiscoveryPacket(cls, received, packet_type, attrs, addr, port=None):
+ def __traceDiscoveryPacket(cls, received, packet_type, attrs, addr,
+ port=None):
"""
Log that a TCF Discovery packet has be sent or received. The trace is
- sent to stdout. This should be called only if the tracing has been turned
- on.
+ sent to stdout. This should be called only if the tracing has been
+ turned on.
@param received
True if the packet was sent, otherwise it was received
@param packet_type
- a string specifying the type of packet, e.g., "CONF_PEER_INFO"
+ a string specifying the type of packet, e.g.,
+ "CONF_PEER_INFO"
@param attrs
- a set of attributes relevant to the type of packet (typically
- a peer's attributes)
+ a set of attributes relevant to the type of packet
+ (typically a peer's attributes)
@param addr
the network address the packet is being sent to
@param port
@@ -855,6 +996,7 @@ class LocatorService(locator.LocatorService):
buf.write("\n\t%s=%s" % (key, value))
logging.trace(buf.getvalue())
+
class LocatorServiceProvider(services.ServiceProvider):
def getLocalService(self, _channel):
class CommandServer(channel.CommandServer):

Back to the top