Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/services/local/LocatorService.py')
-rw-r--r--python/src/tcf/services/local/LocatorService.py172
1 files changed, 101 insertions, 71 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index 44480b634..86e038e58 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -19,12 +19,12 @@ import platform
import threading
import time
import socket
-import cStringIO
+
from .. import locator
from ...util import logging
from ...channel import fromJSONSequence, toJSONSequence
from ...channel.ChannelProxy import ChannelProxy
-from ... import protocol, services, channel, peer, errors
+from ... import compat, protocol, services, channel, peer, errors
# Flag indicating whether tracing of the the discovery activity is enabled.
__TRACE_DISCOVERY__ = False
@@ -50,12 +50,12 @@ class SubNet(object):
else:
l = len(a1) * 8
while i + 8 <= l:
- n = i / 8
+ n = int(i / 8)
if a1[n] != a2[n]:
return False
i += 8
while i < l:
- n = i / 8
+ n = int(i / 8)
m = 1 << (7 - i % 8)
if (a1[n] & m) != (a2[n] & m):
return False
@@ -119,6 +119,9 @@ class InetAddress(object):
def __str__(self):
return "%s/%s" % (self.host or "", self.addr)
+ def __hash__(self):
+ return hash(self.addr)
+
class InputPacket(object):
"Wrapper for UDP packet data."
@@ -216,6 +219,7 @@ class LocatorService(locator.LocatorService):
class DNSLookupThread(threading.Thread):
def run(self):
+
while service._alive:
try:
itemSet = None
@@ -224,7 +228,8 @@ class LocatorService(locator.LocatorService):
if not LocatorService.addr_request:
service._addr_cache_lock.wait(period)
msec = int(time.time() * 1000)
- for host, a in LocatorService.addr_cache.items():
+ items = list(LocatorService.addr_cache.items())
+ for host, a in items:
if a.time_stamp + period * 10 < msec:
if a.used:
if itemSet is None:
@@ -263,9 +268,11 @@ class LocatorService(locator.LocatorService):
sock = service.socket
try:
data, addr = sock.recvfrom(MAX_PACKET_SIZE)
- p = InputPacket(data, InetAddress(None, addr[0]),
- addr[1])
- protocol.invokeAndWait(self._callable, p)
+ if addr and len(addr) >= 2:
+ p = InputPacket(data,
+ InetAddress(None, addr[0]),
+ addr[1])
+ protocol.invokeAndWait(self._callable, p)
except RuntimeError:
# TCF event dispatch is shutdown
return
@@ -285,7 +292,8 @@ class LocatorService(locator.LocatorService):
self.input_thread = InputThread(self.__handleDatagramPacket)
try:
self.loopback_addr = InetAddress(None, "127.0.0.1")
- self.out_buf[0:8] = 'TCF%s\0\0\0\0' % locator.CONF_VERSION
+ tcfversion = 'TCF%s\0\0\0\0' % locator.CONF_VERSION
+ self.out_buf[0:8] = [ord(c) for c in tcfversion]
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
self.socket.bind(('', DISCOVEY_PORT))
@@ -325,6 +333,19 @@ class LocatorService(locator.LocatorService):
def _shutdown(self):
if self._alive:
self._alive = False
+ try:
+ self.socket.shutdown(socket.SHUT_RDWR)
+ except Exception:
+ pass
+ try:
+ self.socket.close()
+ except Exception:
+ pass
+ try:
+ with self._addr_cache_lock:
+ self._addr_cache_lock.notify_all()
+ except Exception:
+ pass
def __makeErrorReport(self, code, msg):
err = {}
@@ -354,7 +375,7 @@ class LocatorService(locator.LocatorService):
channel.sendResult(token, None)
elif name == "getPeers":
arr = []
- for p in self.peers.values():
+ for p in list(self.peers.values()):
arr.append(p.getAttributes())
channel.sendResult(token, toJSONSequence((None, arr)))
else:
@@ -367,10 +388,16 @@ class LocatorService(locator.LocatorService):
return
# Don't report same error multiple times to avoid filling up the log
# file.
- with self._error_log_lock:
- if msg in self.error_log:
- return
- self.error_log.add(msg)
+ try:
+ with self._error_log_lock:
+ if msg in self.error_log:
+ return
+ self.error_log.add(msg)
+ except TypeError:
+ # If the error_log_lock thread is dead, it just means that we are
+ # shutting down. The _alive value does not seem to be up to date
+ # in some cases ...
+ return
protocol.log(msg, x)
def __getInetAddress(self, host):
@@ -411,10 +438,10 @@ class LocatorService(locator.LocatorService):
# Cleanup peers table
stale_peers = None
- for p in self.peers.values():
+ for p in list(self.peers.values()):
if isinstance(p, peer.RemotePeer):
if p.getLastUpdateTime() + locator.DATA_RETENTION_PERIOD < tm:
- if stale_peers == None:
+ if stale_peers is None:
stale_peers = []
stale_peers.append(p)
if stale_peers is not None:
@@ -423,14 +450,14 @@ class LocatorService(locator.LocatorService):
# Try to become a master
port = self.socket.getsockname()[1]
- period = locator.DATA_RETENTION_PERIOD / 2
+ period = int(locator.DATA_RETENTION_PERIOD / 2)
if port != DISCOVEY_PORT and \
self.last_master_packet_time + period <= tm:
s0 = self.socket
s1 = None
try:
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s1.bind(DISCOVEY_PORT)
+ s1.bind((socket.gethostname(), DISCOVEY_PORT))
s1.setsockopt(socket.SOL_UDP, socket.SO_BROADCAST, 1)
self.socket = s1
s0.close()
@@ -471,12 +498,11 @@ class LocatorService(locator.LocatorService):
continue
self.subnets.add(s)
if __TRACE_DISCOVERY__:
- buf = cStringIO.StringIO()
- buf.write("Refreshed subnet list:")
+ buf = "Refreshed subnet list:"
for subnet in self.subnets:
- buf.write("\n\t* address=%s, broadcast=%s" % \
- (subnet.address, subnet.broadcast))
- logging.trace(buf.getvalue())
+ buf += "\n\t* address=%s, broadcast=%s" % \
+ (subnet.address, subnet.broadcast)
+ logging.trace(buf)
def __getAllIpAddresses(self):
import fcntl # @UnresolvedImport
@@ -485,13 +511,15 @@ class LocatorService(locator.LocatorService):
nBytes = 8192
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- names = array.array('B', '\0' * nBytes)
- ifcfg = struct.unpack('iL',
- fcntl.ioctl(s.fileno(), 0x8912,
- struct.pack('iL', nBytes,
- names.buffer_info()[0])))[0]
+ names = array.array('B', [0] * nBytes)
+ ifcfg = struct.unpack(
+ 'iL', fcntl.ioctl(s.fileno(), 0x8912,
+ struct.pack('iL', nBytes,
+ names.buffer_info()[0])))[0]
namestr = names.tostring()
+ if namestr and isinstance(namestr[0], int):
+ namestr = ''.join(chr(b) for b in namestr)
res = []
# the ipconf structure changed at a time, check if there are more than
@@ -513,7 +541,7 @@ class LocatorService(locator.LocatorService):
ipStartIx = ix + 20
ipEndIx = ix + 24
ip = namestr[ipStartIx:ipEndIx]
- res.append(str(ord(ip[0])) + '.' + str(ord(ip[1])) + '.' + \
+ res.append(str(ord(ip[0])) + '.' + str(ord(ip[1])) + '.' +
str(ord(ip[2])) + '.' + str(ord(ip[3])))
return (res)
@@ -523,21 +551,24 @@ class LocatorService(locator.LocatorService):
if len(self.addr_list) == 0:
# Create the list of IP address for this host
_, _, self.addr_list = socket.gethostbyname_ex(hostname)
- if not "127.0.0.1" in self.addr_list:
+ if "127.0.0.1" not in self.addr_list:
self.addr_list.append("127.0.0.1")
# On unix hosts, use sockets to get the other interfaces IPs
if (platform.system() != 'Windows'):
for ip_addr in self.__getAllIpAddresses():
- if not ip_addr in self.addr_list:
+ if ip_addr not in self.addr_list:
self.addr_list.append(ip_addr)
for address in self.addr_list:
rawaddr = socket.inet_aton(address)
if len(rawaddr) != 4:
continue
- rawaddr = rawaddr[:3] + '\xFF'
+ if isinstance(rawaddr, str):
+ rawaddr = rawaddr[:3] + '\xFF'
+ elif isinstance(rawaddr, bytes):
+ rawaddr = bytes([b for b in rawaddr[:3]] + [255])
broadcast = socket.inet_ntoa(rawaddr)
_set.add(SubNet(24, InetAddress(hostname, address),
InetAddress(None, broadcast)))
@@ -569,7 +600,7 @@ class LocatorService(locator.LocatorService):
addr == subnet.address:
return False
- self.socket.sendto(str(self.out_buf[:size]),
+ self.socket.sendto(compat.str2bytes(self.out_buf[:size]),
(addr.getHostAddress(), port))
if __TRACE_DISCOVERY__:
@@ -630,7 +661,7 @@ class LocatorService(locator.LocatorService):
"""
cnt = 0
attrs = {}
- s = data[8:size].decode("UTF-8")
+ s = data[8:].decode("UTF-8")
l = len(s)
i = 0
while i < l:
@@ -662,8 +693,11 @@ class LocatorService(locator.LocatorService):
for subnet in self.subnets:
if isinstance(_peer, peer.RemotePeer):
- if self.socket.getsockname()[1] != DISCOVEY_PORT:
- return
+ try:
+ if self.socket.getsockname()[1] != DISCOVEY_PORT:
+ return
+ except:
+ pass
if not subnet.address == self.loopback_addr and \
not subnet.address == peer_addr:
continue
@@ -671,13 +705,10 @@ class LocatorService(locator.LocatorService):
if not subnet.contains(peer_addr):
continue
if i == 8:
- sb = cStringIO.StringIO()
- for key in attrs.keys():
- sb.write(key)
- sb.write('=')
- sb.write(attrs.get(key))
- sb.write('\0')
- bt = self.__getUTF8Bytes(sb.getvalue())
+ sb = []
+ for key in list(attrs.keys()):
+ sb.append(str(key) + '=' + str(attrs.get(key)))
+ bt = self.__getUTF8Bytes('\0'.join(sb))
if i + len(bt) > len(self.out_buf):
return
self.out_buf[i:i + len(bt)] = bt
@@ -695,7 +726,7 @@ class LocatorService(locator.LocatorService):
def __sendAll(self, addr, port, sl, tm):
for subnet in self.subnets:
subnet.send_all_ok = False
- for peer in self.peers.values():
+ for peer in list(self.peers.values()):
self._sendPeerInfo(peer, addr, port)
if addr is not None and sl is not None and \
sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm:
@@ -750,7 +781,7 @@ class LocatorService(locator.LocatorService):
if i > 8 and i + len(bt) >= PREF_PACKET_SIZE:
self.__sendDatagramPacket(subnet, i, addr, port)
i = 8
- self.out_buf[i:i+len(bt)] = bt
+ self.out_buf[i:len(bt)] = bt
i += len(bt)
self.out_buf[i] = 0
i += 1
@@ -783,14 +814,14 @@ class LocatorService(locator.LocatorService):
remote_port = p.getPort()
remote_address = p.getAddress()
if self.__isRemote(remote_address, remote_port):
- code = ord(buf[4])
- if code == locator.CONF_PEERS_REMOVED:
+ if buf[4] == locator.CONF_PEERS_REMOVED:
self.__handlePeerRemovedPacket(p)
else:
sl = None
if remote_port != DISCOVEY_PORT:
sl = self.__addSlave(remote_address, remote_port, tm,
tm)
+ code = ord(buf[4])
if code == locator.CONF_PEER_INFO:
self.__handlePeerInfoPacket(p)
elif code == locator.CONF_REQ_INFO:
@@ -802,11 +833,11 @@ class LocatorService(locator.LocatorService):
for subnet in self.subnets:
if not subnet.contains(remote_address):
continue
- delay = locator.DATA_RETENTION_PERIOD / 3
+ delay = int(locator.DATA_RETENTION_PERIOD / 3)
if remote_port != DISCOVEY_PORT:
- delay = locator.DATA_RETENTION_PERIOD / 32
+ delay = int(locator.DATA_RETENTION_PERIOD / 32)
elif subnet.address != remote_address:
- delay = locator.DATA_RETENTION_PERIOD / 2
+ delay = int(locator.DATA_RETENTION_PERIOD / 2)
if subnet.last_slaves_req_time + delay <= tm:
self.__sendSlavesRequest(subnet, remote_address,
remote_port)
@@ -815,7 +846,7 @@ class LocatorService(locator.LocatorService):
remote_port == DISCOVEY_PORT:
self.last_master_packet_time = tm
except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % \
+ self._log("Invalid datagram packet received from %s/%s" %
(p.getAddress(), p.getPort()), x)
def __handlePeerInfoPacket(self, p):
@@ -843,7 +874,7 @@ class LocatorService(locator.LocatorService):
elif _peer is None:
peer.RemotePeer(attrs)
except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % \
+ self._log("Invalid datagram packet received from %s/%s" %
(p.getAddress(), p.getPort()), x)
def __handleReqInfoPacket(self, p, sl, tm):
@@ -856,7 +887,7 @@ class LocatorService(locator.LocatorService):
attrs = self.__parseIDs(p.getData(), p.getLength())
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", attrs, p)
- for s in attrs.values():
+ for s in list(attrs.values()):
i = 0
l = len(s)
time0 = i
@@ -888,9 +919,9 @@ class LocatorService(locator.LocatorService):
time_val = time_now
if time_val < 3600000:
# Time stamp is "time to live" in milliseconds
- time_val = time_now + time_val / 1000 - \
- locator.DATA_RETENTION_PERIOD
- elif time_val < time_now / 1000 + 50000000:
+ time_val = time_now + int(time_val / 1000) - \
+ locator.DATA_RETENTION_PERIOD
+ elif time_val < int(time_now / 1000) + 50000000:
# Time stamp is in seconds
time_val = 1000
else:
@@ -899,17 +930,17 @@ class LocatorService(locator.LocatorService):
if time_val < time_now - delta or \
time_val > time_now + delta:
msg = "Invalid slave info timestamp: %s -> %s" % (
- timestamp,
- time.strftime("%Y-%m-%d %H:%M:%S",
- time.localtime(time_val /
- 1000.)))
+ timestamp,
+ time.strftime("%Y-%m-%d %H:%M:%S",
+ time.localtime(time_val /
+ 1000.)))
self._log("Invalid datagram packet received " +
- "from %s/%s" % (
- p.getAddress(), p.getPort()),
+ "from %s/%s" % (p.getAddress(),
+ p.getPort()),
Exception(msg))
time_val = time_now - \
- locator.DATA_RETENTION_PERIOD / 2
+ int(locator.DATA_RETENTION_PERIOD / 2)
self.__addSlave(addr, port, time_val, time_now)
except Exception as x:
self._log("Invalid datagram packet received from " +
@@ -928,7 +959,7 @@ class LocatorService(locator.LocatorService):
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", attrs,
p)
- for _id in attrs.values():
+ for _id in list(attrs.values()):
_peer = self.peers.get(_id)
if isinstance(_peer, peer.RemotePeer):
_peer.dispose()
@@ -985,14 +1016,13 @@ class LocatorService(locator.LocatorService):
# addr is a InputPacket
port = addr.getPort()
addr = addr.getAddress()
- buf = cStringIO.StringIO()
- buf.write(packet_type)
- buf.write((" sent to ", " received from ")[received])
- buf.write("%s/%s" % (addr, port))
+ buf = str(packet_type)
+ buf += (" sent to ", " received from ")[received]
+ buf += "%s/%s" % (addr, port)
if attrs is not None:
- for key, value in attrs.items():
- buf.write("\n\t%s=%s" % (key, value))
- logging.trace(buf.getvalue())
+ for key, value in list(attrs.items()):
+ buf += "\n\t%s=%s" % (key, value)
+ logging.trace(buf)
class LocatorServiceProvider(services.ServiceProvider):

Back to the top