diff options
Diffstat (limited to 'python/src/tcf/services/local/LocatorService.py')
-rw-r--r-- | python/src/tcf/services/local/LocatorService.py | 1614 |
1 files changed, 807 insertions, 807 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py index 5d8f43bc2..5a9271548 100644 --- a/python/src/tcf/services/local/LocatorService.py +++ b/python/src/tcf/services/local/LocatorService.py @@ -1,807 +1,807 @@ -# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
-
-"""
-Locator service uses transport layer to search
-for peers and to collect and maintain up-to-date
-data about peer's attributes.
-"""
-
-import threading, time, socket, cStringIO
-from tcf.services import locator
-from tcf.util import logging
-from tcf.channel import fromJSONSequence, toJSONSequence
-from tcf.channel.ChannelProxy import ChannelProxy
-from tcf import protocol, services, channel, peer, errors
-
-# Flag indicating whether tracing of the the discovery activity is enabled.
-__TRACE_DISCOVERY__ = False
-
-class SubNet(object):
- def __init__(self, prefix_length, address, broadcast):
- self.prefix_length = prefix_length
- self.address = address
- self.broadcast = broadcast
- self.last_slaves_req_time = 0
- def contains(self, addr):
- if addr is None or self.address is None: return False
- a1 = addr.getAddress()
- a2 = self.address.getAddress()
- if len(a1) != len(a2): return False
- i = 0
- if self.prefix_length <= len(a1) * 8:
- l = self.prefix_length
- else:
- l = len(a1) * 8
- while i + 8 <= l:
- n = i / 8
- if a1[n] != a2[n]: return False
- i += 8
- while i < l:
- n = i / 8
- m = 1 << (7 - i % 8)
- if (a1[n] & m) != (a2[n] & m): return False
- i += 1
- return True
- def __eq__(self, o):
- if not isinstance(o, SubNet): return False
- return self.prefix_length == o.prefix_length and \
- self.broadcast == o.broadcast and \
- self.address == o.address
- def __hash__(self):
- return hash(self.address)
- def __str__(self):
- return "%s/%d" % (self.address.getHostAddress(), self.prefix_length)
-
-class Slave(object):
- # Time of last packet receiver from self slave
- last_packet_time = 0
- #Time of last REQ_SLAVES packet received from self slave
- last_req_slaves_time = 0
-
- def __init__(self, address, port):
- self.address = address
- self.port = port
- def __str__(self):
- return "%s/%d" % (self.address.getHostAddress(), self.port)
-
-class AddressCacheItem(object):
- address = None
- time_stamp = 0
- used = False
- def __init__(self, host):
- self.host = host
-
-class InetAddress(object):
- "Mimicking Java InetAddress class"
- def __init__(self, host, addr):
- self.host = host
- self.addr = addr
- def getAddress(self):
- return socket.inet_aton(self.addr)
- def getHostAddress(self):
- return self.addr
- def __eq__(self, other):
- if not isinstance(other, InetAddress): return False
- return self.addr == other.addr
- def __str__(self):
- return "%s/%s" % (self.host or "", self.addr)
-
-class InputPacket(object):
- "Wrapper for UDP packet data."
- def __init__(self, data, addr, port):
- self.data = data
- self.addr = addr
- self.port = port
- def getLength(self):
- return len(self.data)
- def getData(self):
- return self.data
- def getPort(self):
- return self.port
- def getAddress(self):
- return self.addr
- def __str__(self):
- return "[address=%s,port=%d,data=\"%s\"]" % (self.getAddress(), self.getPort(), self.data)
-
-DISCOVEY_PORT = 1534
-MAX_PACKET_SIZE = 9000 - 40 - 8
-PREF_PACKET_SIZE = 1500 - 40 - 8
-
-# TODO: research usage of DNS-SD (DNS Service Discovery) to discover TCF peers
-class LocatorService(locator.LocatorService):
- locator = None
- peers = {} # str->Peer
- listeners = [] # list of LocatorListener
- error_log = set() # set of str
-
- addr_cache = {} # str->AddressCacheItem
- addr_request = False
- local_peer = None
- last_master_packet_time = 0
-
- @classmethod
- def getLocalPeer(cls):
- return cls.local_peer
-
- @classmethod
- def getListeners(cls):
- return cls.listeners[:]
-
- @classmethod
- def startup(cls):
- if cls.locator:
- cls.locator._startup()
-
- @classmethod
- def shutdown(cls):
- if cls.locator:
- cls.locator._shutdown()
-
- def __init__(self):
- self._error_log_lock = threading.RLock()
- self._alive = False
- LocatorService.locator = self
- LocatorService.local_peer = peer.LocalPeer()
-
- def _startup(self):
- if self._alive: return
- self._alive = True
- self._addr_cache_lock = threading.Condition()
- self.subnets = set()
- self.slaves = []
- self.inp_buf = bytearray(MAX_PACKET_SIZE)
- self.out_buf = bytearray(MAX_PACKET_SIZE)
- service = self
- class TimerThread(threading.Thread):
- def __init__(self, callable):
- self._callable = callable
- super(TimerThread, self).__init__()
- def run(self):
- while service._alive:
- try:
- time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.)
- protocol.invokeAndWait(self._callable)
- except RuntimeError:
- # TCF event dispatch is shut down
- return
- except Exception as x:
- service._log("Unhandled exception in TCF discovery timer thread", x)
- self.timer_thread = TimerThread(self.__refresh_timer)
- class DNSLookupThread(threading.Thread):
- def run(self):
- while service._alive:
- try:
- itemSet = None
- with service._addr_cache_lock:
- if not LocatorService.addr_request:
- service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
- msec = int(time.time() * 1000)
- for host, a in LocatorService.addr_cache.items():
- if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec:
- if a.used:
- if itemSet is None: itemSet = set()
- itemSet.add(a)
- else:
- del LocatorService.addr_cache[host]
- LocatorService.addr_request = False
- if itemSet is not None:
- for a in itemSet:
- addr = None
- try:
- addr = socket.gethostbyname(a.host)
- except socket.gaierror:
- pass
- with service._addr_cache_lock:
- if addr is None:
- a.address = None
- else:
- a.address = InetAddress(a.host, addr)
- a.time_stamp = msec
- a.used = False
- except Exception as x:
- service._log("Unhandled exception in TCF discovery DNS lookup thread", x)
- self.dns_lookup_thread = DNSLookupThread()
- class InputThread(threading.Thread):
- def __init__(self, callable):
- self._callable = callable
- super(InputThread, self).__init__()
- def run(self):
- try:
- while service._alive:
- sock = service.socket
- try:
- data, addr = sock.recvfrom(MAX_PACKET_SIZE)
- p = InputPacket(data, InetAddress(None, addr[0]), addr[1])
- protocol.invokeAndWait(self._callable, p)
- except RuntimeError:
- # TCF event dispatch is shutdown
- return
- except socket.error as x:
- if sock != service.socket: continue
- # frequent error on windows, unknown reason
- if x.errno == 10054: continue
- port = sock.getsockname()[1]
- service._log("Cannot read from datagram socket at port %d" % port, x)
- time.sleep(2)
- except Exception as x:
- service._log("Unhandled exception in socket reading thread", x)
- self.input_thread = InputThread(self.__handleDatagramPacket)
- try:
- self.loopback_addr = InetAddress(None, "127.0.0.1")
- self.out_buf[0:8] = 'TCF%s\0\0\0\0' % locator.CONF_VERSION
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- self.socket.bind(('', DISCOVEY_PORT))
- if __TRACE_DISCOVERY__:
- logging.trace("Became the master agent (bound to port %d)" % self.socket.getsockname()[1])
- except socket.error as x:
- self.socket.bind(('', 0))
- if __TRACE_DISCOVERY__:
- logging.trace("Became a slave agent (bound to port %d)" % self.socket.getsockname()[1])
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- self.input_thread.setName("TCF Locator Receiver")
- self.timer_thread.setName("TCF Locator Timer")
- self.dns_lookup_thread.setName("TCF Locator DNS Lookup")
- self.input_thread.setDaemon(True)
- self.timer_thread.setDaemon(True)
- self.dns_lookup_thread.setDaemon(True)
- self.input_thread.start()
- self.timer_thread.start()
- self.dns_lookup_thread.start()
- class LocatorListener(locator.LocatorListener):
- def peerAdded(self, peer):
- service._sendPeerInfo(peer, None, 0)
- def peerChanged(self, peer):
- service._sendPeerInfo(peer, None, 0)
- self.listeners.append(LocatorListener())
- self.__refreshSubNetList()
- self.__sendPeersRequest(None, 0)
- self.__sendAll(None, 0, None, int(time.time() * 1000))
- except Exception as x:
- self._log("Cannot open UDP socket for TCF discovery protocol", x)
-
- def _shutdown(self):
- if self._alive:
- self._alive = False
-
- def __makeErrorReport(self, code, msg):
- err = {}
- err[errors.ERROR_TIME] = int(time.time() * 1000)
- err[errors.ERROR_CODE] = code
- err[errors.ERROR_FORMAT] = msg
- return err
-
- def __command(self, channel, token, name, data):
- try:
- if name == "redirect":
- peer_id = fromJSONSequence(data)[0]
- _peer = self.peers.get(peer_id)
- if _peer is None:
- error = self.__makeErrorReport(errors.TCF_ERROR_UNKNOWN_PEER, "Unknown peer ID")
- channel.sendResult(token, toJSONSequence((error,)))
- return
- channel.sendResult(token, toJSONSequence((None,)))
- if isinstance(_peer, peer.LocalPeer):
- channel.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((channel.getLocalServices(),)))
- return
- ChannelProxy(channel, _peer.openChannel())
- elif name == "sync":
- channel.sendResult(token, None)
- elif name == "getPeers":
- arr = []
- for p in self.peers.values():
- arr.append(p.getAttributes())
- channel.sendResult(token, toJSONSequence((None, arr)))
- else:
- channel.rejectCommand(token)
- except Exception as x:
- channel.terminate(x)
-
- def _log(self, msg, x):
- if not self._alive: return
- # Don't report same error multiple times to avoid filling up the log file.
- with self._error_log_lock:
- if msg in self.error_log: return
- self.error_log.add(msg)
- protocol.log(msg, x)
-
- def __getInetAddress(self, host):
- if not host: return None
- with self._addr_cache_lock:
- i = self.addr_cache.get(host)
- if i is None:
- i = AddressCacheItem(host)
- ch = host[0]
- if ch == '[' or ch == ':' or ch >= '0' and ch <= '9':
- try:
- addr = socket.gethostbyname(host)
- i.address = InetAddress(host, addr)
- except socket.gaierror:
- pass
- i.time_stamp = int(time.time() * 1000)
- else:
- # socket.gethostbyname() can cause long delay - delegate to background thread
- LocatorService.addr_request = True
- self._addr_cache_lock.notify()
- self.addr_cache[host] = i
- i.used = True
- return i.address
-
- def __refresh_timer(self):
- tm = int(time.time() * 1000)
- # Cleanup slave table
- if self.slaves:
- i = 0
- while i < len(self.slaves):
- s = self.slaves[i]
- if s.last_packet_time + locator.DATA_RETENTION_PERIOD < tm:
- del self.slaves[i]
- else:
- i += 1
-
- # Cleanup peers table
- stale_peers = None
- for p in self.peers.values():
- if isinstance(p, peer.RemotePeer):
- if p.getLastUpdateTime() + locator.DATA_RETENTION_PERIOD < tm:
- if stale_peers == None: stale_peers = []
- stale_peers.append(p)
- if stale_peers is not None:
- for p in stale_peers: p.dispose()
-
- # Try to become a master
- port = self.socket.getsockname()[1]
- if port != DISCOVEY_PORT and \
- self.last_master_packet_time + locator.DATA_RETENTION_PERIOD / 2 <= tm:
- s0 = self.socket
- s1 = None
- try:
- s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s1.bind(DISCOVEY_PORT)
- s1.setsockopt(socket.SOL_UDP, socket.SO_BROADCAST, 1)
- self.socket = s1
- s0.close()
- except:
- pass
- self.__refreshSubNetList()
- if port != DISCOVEY_PORT:
- for subnet in self.subnets:
- self.__addSlave(subnet.address, port, tm, tm)
- self.__sendAll(None, 0, None, tm)
-
- def __addSlave(self, addr, port, timestamp, time_now):
- for s in self.slaves:
- if s.port == port and s.address == addr:
- if s.last_packet_time < timestamp: s.last_packet_time = timestamp
- return s
- s = Slave(addr, port)
- s.last_packet_time = timestamp
- self.slaves.append(s)
- self.__sendPeersRequest(addr, port)
- self.__sendAll(addr, port, s, time_now)
- self.__sendSlaveInfo(s, time_now)
- return s
-
- def __refreshSubNetList(self):
- subNetSet = set()
- try:
- self.__getSubNetList(subNetSet)
- except Exception as x:
- self._log("Cannot get list of network interfaces", x)
- for s in tuple(self.subnets):
- if s in subNetSet: continue
- self.subnets.remove(s)
- for s in subNetSet:
- if s in self.subnets: continue
- self.subnets.add(s)
- if __TRACE_DISCOVERY__:
- str = cStringIO.StringIO()
- str.write("Refreshed subnet list:")
- for subnet in self.subnets:
- str.write("\n\t* address=%s, broadcast=%s" % (subnet.address, subnet.broadcast))
- logging.trace(str.getvalue())
-
- def __getSubNetList(self, set):
- # TODO iterate over network interfaces to get proper broadcast addresses
- hostname = socket.gethostname()
- _, _, addresses = socket.gethostbyname_ex(hostname)
- if not "127.0.0.1" in addresses:
- addresses.append("127.0.0.1")
- for address in addresses:
- rawaddr = socket.inet_aton(address)
- if len(rawaddr) != 4: continue
- rawaddr = rawaddr[:3] + '\xFF'
- broadcast = socket.inet_ntoa(rawaddr)
- set.add(SubNet(24, InetAddress(hostname, address), InetAddress(None, broadcast)))
-
- def __getUTF8Bytes(self, s):
- return s.encode("UTF-8")
-
- # Used for tracing
- packetTypes = [
- None,
- "CONF_REQ_INFO",
- "CONF_PEER_INFO",
- "CONF_REQ_SLAVES",
- "CONF_SLAVES_INFO",
- "CONF_PEERS_REMOVED"
- ]
-
- def __sendDatagramPacket(self, subnet, size, addr, port):
- try:
- if addr is None:
- addr = subnet.broadcast
- port = DISCOVEY_PORT
- for slave in self.slaves:
- self.__sendDatagramPacket(subnet, size, slave.address, slave.port)
- if not subnet.contains(addr): return False
- if port == self.socket.getsockname()[1] and addr == subnet.address: return False
- self.socket.sendto(str(self.out_buf[:size]), (addr.getHostAddress(), port))
-
- if __TRACE_DISCOVERY__:
- map = None
- if self.out_buf[4] == locator.CONF_PEER_INFO:
- map = self.__parsePeerAttributes(self.out_buf, 8)
- elif self.out_buf[4] == locator.CONF_SLAVES_INFO:
- map = self.__parseIDs(self.out_buf, size)
- elif self.out_buf[4] == locator.CONF_PEERS_REMOVED:
- map = self.__parseIDs(self.out_buf, size)
- self.__traceDiscoveryPacket(False, self.packetTypes[self.out_buf[4]], map, addr, port)
- except Exception as x:
- self._log("Cannot send datagram packet to %s" % addr, x)
- return False
- return True
-
- def __parsePeerAttributes(self, data, size):
- """
- Parse peer attributes in CONF_PEER_INFO packet data
-
- @param data - the packet data
- @param size - the packet size
- @return a map containing the attributes
- """
- map = {}
- s = data[8:size - 8].decode("UTF-8")
- l = len(s)
- i = 0
- while i < l:
- i0 = i
- while i < l and s[i] != '=' and s[i] != '\0': i += 1
- i1 = i
- if i < l and s[i] == '=': i += 1
- i2 = i
- while i < l and s[i] != '\0': i += 1
- i3 = i
- if i < l and s[i] == '\0': i += 1
- key = s[i0:i1]
- val = s[i2:i3]
- map[key] = val
- return map
-
- def __parseIDs(self, data, size):
- """
- Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet data.
-
- @param data - the packet data
- @param size - the packet size
- @return a map containing the IDs
- """
- cnt = 0
- map = {}
- s = data[8:size - 8].decode("UTF-8")
- l = len(s)
- i = 0
- while i < l:
- i0 = i
- while i < l and s[i] != '\0': i += 1
- if i > i0:
- id = s[i0:i]
- map[str(cnt)] = id
- cnt += 1
- while i < l and s[i] == '\0': i += 1
- return map
-
- def __sendPeersRequest(self, addr, port):
- self.out_buf[4] = locator.CONF_REQ_INFO
- for subnet in self.subnets:
- self.__sendDatagramPacket(subnet, 8, addr, port)
-
- def _sendPeerInfo(self, _peer, addr, port):
- attrs = _peer.getAttributes()
- peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST))
- if peer_addr is None: return
- if attrs.get(peer.ATTR_IP_PORT) is None: return
- self.out_buf[4] = locator.CONF_PEER_INFO
- i = 8
-
- for subnet in self.subnets:
- if isinstance(_peer, peer.RemotePeer):
- if self.socket.getsockname()[1] != DISCOVEY_PORT: return
- if not subnet.address == self.loopback_addr and not subnet.address == peer_addr: continue
- if not subnet.address == self.loopback_addr:
- if not subnet.contains(peer_addr): continue
- if i == 8:
- sb = cStringIO.StringIO()
- for key in attrs.keys():
- sb.write(key)
- sb.write('=')
- sb.write(attrs.get(key))
- sb.write('\0')
- bt = self.__getUTF8Bytes(sb.getvalue())
- if i + len(bt) > len(self.out_buf): return
- self.out_buf[i:i+len(bt)] = bt
- i += len(bt)
- if self.__sendDatagramPacket(subnet, i, addr, port): subnet.send_all_ok = True
-
- def __sendEmptyPacket(self, addr, port):
- self.out_buf[4] = locator.CONF_SLAVES_INFO
- for subnet in self.subnets:
- if subnet.send_all_ok: continue
- self.__sendDatagramPacket(subnet, 8, addr, port)
-
- def __sendAll(self, addr, port, sl, tm):
- for subnet in self.subnets: subnet.send_all_ok = False
- for peer in self.peers.values(): self._sendPeerInfo(peer, addr, port)
- if addr is not None and sl is not None and sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm:
- self.__sendSlavesInfo(addr, port, tm)
- self.__sendEmptyPacket(addr, port)
-
- def __sendSlavesRequest(self, subnet, addr, port):
- self.out_buf[4] = locator.CONF_REQ_SLAVES
- self.__sendDatagramPacket(subnet, 8, addr, port)
-
- def __sendSlaveInfo(self, x, tm):
- ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
- if ttl <= 0: return
- self.out_buf[4] = locator.CONF_SLAVES_INFO
- for subnet in self.subnets:
- if not subnet.contains(x.address): continue
- i = 8
- s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress())
- bt = self.__getUTF8Bytes(s)
- self.out_buf[i:i+len(bt)] = bt
- i += len(bt)
- self.out_buf[i] = 0
- i += 1
- for y in self.slaves:
- if not subnet.contains(y.address): continue
- if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: continue
- self.__sendDatagramPacket(subnet, i, y.address, y.port)
-
- def __sendSlavesInfo(self, addr, port, tm):
- self.out_buf[4] = locator.CONF_SLAVES_INFO
- for subnet in self.subnets:
- if not subnet.contains(addr): continue
- i = 8
- for x in self.slaves:
- ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
- if ttl <= 0: return
- if x.port == port and x.address == addr: continue
- if not subnet.address == self.loopback_addr:
- if not subnet.contains(x.address): continue
- subnet.send_all_ok = True
- s = "%d:%s:%s" % (x.last_packet_time, x.port, x.address.getHostAddress())
- bt = self.__getUTF8Bytes(s)
- if i > 8 and i + len(bt) >= PREF_PACKET_SIZE:
- self.__sendDatagramPacket(subnet, i, addr, port)
- i = 8
- self.out_buf[i:len(bt)] = bt
- i += len(bt)
- self.out_buf[i] = 0
- i += 1
- if i > 8: self.__sendDatagramPacket(subnet, i, addr, port)
-
- def __isRemote(self, address, port):
- if port != self.socket.getsockname()[1]: return True
- for s in self.subnets:
- if s.address == address: return False
- return True
-
- def __handleDatagramPacket(self, p):
- try:
- tm = int(time.time() * 1000)
- buf = p.getData()
- len = p.getLength()
- if len < 8: return
- if buf[0] != 'T': return
- if buf[1] != 'C': return
- if buf[2] != 'F': return
- if buf[3] != locator.CONF_VERSION: return
- remote_port = p.getPort()
- remote_address = p.getAddress()
- if self.__isRemote(remote_address, remote_port):
- if buf[4] == locator.CONF_PEERS_REMOVED:
- self.__handlePeerRemovedPacket(p)
- else:
- sl = None
- if remote_port != DISCOVEY_PORT:
- sl = self.__addSlave(remote_address, remote_port, tm, tm)
- code = ord(buf[4])
- if code == locator.CONF_PEER_INFO:
- self.__handlePeerInfoPacket(p)
- elif code == locator.CONF_REQ_INFO:
- self.__handleReqInfoPacket(p, sl, tm)
- elif code == locator.CONF_SLAVES_INFO:
- self.__handleSlavesInfoPacket(p, tm)
- elif code == locator.CONF_REQ_SLAVES:
- self.__handleReqSlavesPacket(p, sl, tm)
- for subnet in self.subnets:
- if not subnet.contains(remote_address): continue
- delay = locator.DATA_RETENTION_PERIOD / 3
- if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32
- elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2
- if subnet.last_slaves_req_time + delay <= tm:
- self.__sendSlavesRequest(subnet, remote_address, remote_port)
- subnet.last_slaves_req_time = tm
- if subnet.address == remote_address and remote_port == DISCOVEY_PORT:
- self.last_master_packet_time = tm
- except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
-
- def __handlePeerInfoPacket(self, p):
- try:
- map = self.__parsePeerAttributes(p.getData(), p.getLength())
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p)
- id = map.get(peer.ATTR_ID)
- if id is None: raise RuntimeError("Invalid peer info: no ID")
- ok = True
- host = map.get(peer.ATTR_IP_HOST)
- if host is not None:
- ok = False
- peer_addr = self.__getInetAddress(host)
- if peer_addr is not None:
- for subnet in self.subnets:
- if subnet.contains(peer_addr):
- ok = True
- break
- if ok:
- _peer = self.peers.get(id)
- if isinstance(_peer, peer.RemotePeer):
- _peer.updateAttributes(map)
- elif _peer is None:
- peer.RemotePeer(map)
- except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
-
- def __handleReqInfoPacket(self, p, sl, tm):
- if __TRACE_DISCOVERY__:
- self.__traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p)
- self.__sendAll(p.getAddress(), p.getPort(), sl, tm)
-
- def __handleSlavesInfoPacket(self, p, time_now):
- try:
- map = self.__parseIDs(p.getData(), p.getLength())
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", map, p)
- for s in map.values():
- i = 0
- l = len(s)
- time0 = i
- while i < l and s[i] != ':' and s[i] != '\0': i += 1
- time1 = i
- if i < l and s[i] == ':': i += 1
- port0 = i
- while i < l and s[i] != ':' and s[i] != '\0': i += 1
- port1 = i
- if i < l and s[i] == ':': i += 1
- host0 = i
- while i < l and s[i] != '\0': i += 1
- host1 = i
- port = int(s[port0:port1])
- timestamp = s[time0:time1]
- host = s[host0:host1]
- if port != DISCOVEY_PORT:
- addr = self.__getInetAddress(host)
- if addr is not None:
- delta = 10006030 # 30 minutes
- if len(timestamp) > 0:
- time_val = int(timestamp)
- else:
- time_val = time_now
- if time_val < 3600000:
- # Time stamp is "time to live" in milliseconds
- time_val = time_now + time_val / 1000 - locator.DATA_RETENTION_PERIOD
- elif time_val < time_now / 1000 + 50000000:
- # Time stamp is in seconds
- time_val= 1000
- else:
- # Time stamp is in milliseconds
- pass
- if time_val < time_now - delta or time_val > time_now + delta:
- msg = "Invalid slave info timestamp: %s -> %s" % (
- timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val/1000.)))
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), Exception(msg))
- time_val = time_now - locator.DATA_RETENTION_PERIOD / 2
- self.__addSlave(addr, port, time_val, time_now)
- except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
-
- def __handleReqSlavesPacket(self, p, sl, tm):
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
- if sl is not None: sl.last_req_slaves_time = tm
- self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm)
-
- def __handlePeerRemovedPacket(self, p):
- try:
- map = self.__parseIDs(p.getData(), p.getLength())
- if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", map, p)
- for id in map.values():
- _peer = self.peers.get(id)
- if isinstance(_peer, peer.RemotePeer): _peer.dispose()
- except Exception as x:
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
-
-
- @classmethod
- def getLocator(cls):
- return cls.locator
-
- def getPeers(self):
- assert protocol.isDispatchThread()
- return self.peers
-
- def redirect(self, peer, done):
- raise RuntimeError("Channel redirect cannot be done on local peer")
-
- def sync(self, done):
- raise RuntimeError("Channel sync cannot be done on local peer")
-
- def addListener(self, listener):
- assert listener is not None
- assert protocol.isDispatchThread()
- self.listeners.append(listener)
-
- def removeListener(self, listener):
- assert protocol.isDispatchThread()
- self.listeners.remove(listener)
-
- @classmethod
- def __traceDiscoveryPacket(cls, received, type, attrs, addr, port=None):
- """
- Log that a TCF Discovery packet has be sent or received. The trace is
- sent to stdout. This should be called only if the tracing has been turned
- on.
-
- @param received
- True if the packet was sent, otherwise it was received
- @param type
- a string specifying the type of packet, e.g., "CONF_PEER_INFO"
- @param attrs
- a set of attributes relevant to the type of packet (typically
- a peer's attributes)
- @param addr
- the network address the packet is being sent to
- @param port
- the port the packet is being sent to
- """
- assert __TRACE_DISCOVERY__
- if port is None:
- # addr is a InputPacket
- port = addr.getPort()
- addr = addr.getAddress()
- str = cStringIO.StringIO()
- str.write(type)
- str.write((" sent to ", " received from ")[received])
- str.write("%s/%s" % (addr, port))
- if attrs is not None:
- for key, value in attrs.items():
- str.write("\n\t%s=%s" % (key, value))
- logging.trace(str.getvalue())
-
-class LocatorServiceProvider(services.ServiceProvider):
- def getLocalService(self, _channel):
- class CommandServer(channel.CommandServer):
- def command(self, token, name, data):
- LocatorService.locator.command(channel, token, name, data)
- _channel.addCommandServer(LocatorService.locator, CommandServer())
- return (LocatorService.locator,)
-
-services.addServiceProvider(LocatorServiceProvider())
+# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +""" +Locator service uses transport layer to search +for peers and to collect and maintain up-to-date +data about peer's attributes. +""" + +import threading, time, socket, cStringIO +from tcf.services import locator +from tcf.util import logging +from tcf.channel import fromJSONSequence, toJSONSequence +from tcf.channel.ChannelProxy import ChannelProxy +from tcf import protocol, services, channel, peer, errors + +# Flag indicating whether tracing of the the discovery activity is enabled. +__TRACE_DISCOVERY__ = False + +class SubNet(object): + def __init__(self, prefix_length, address, broadcast): + self.prefix_length = prefix_length + self.address = address + self.broadcast = broadcast + self.last_slaves_req_time = 0 + def contains(self, addr): + if addr is None or self.address is None: return False + a1 = addr.getAddress() + a2 = self.address.getAddress() + if len(a1) != len(a2): return False + i = 0 + if self.prefix_length <= len(a1) * 8: + l = self.prefix_length + else: + l = len(a1) * 8 + while i + 8 <= l: + n = i / 8 + if a1[n] != a2[n]: return False + i += 8 + while i < l: + n = i / 8 + m = 1 << (7 - i % 8) + if (a1[n] & m) != (a2[n] & m): return False + i += 1 + return True + def __eq__(self, o): + if not isinstance(o, SubNet): return False + return self.prefix_length == o.prefix_length and \ + self.broadcast == o.broadcast and \ + self.address == o.address + def __hash__(self): + return hash(self.address) + def __str__(self): + return "%s/%d" % (self.address.getHostAddress(), self.prefix_length) + +class Slave(object): + # Time of last packet receiver from self slave + last_packet_time = 0 + #Time of last REQ_SLAVES packet received from self slave + last_req_slaves_time = 0 + + def __init__(self, address, port): + self.address = address + self.port = port + def __str__(self): + return "%s/%d" % (self.address.getHostAddress(), self.port) + +class AddressCacheItem(object): + address = None + time_stamp = 0 + used = False + def __init__(self, host): + self.host = host + +class InetAddress(object): + "Mimicking Java InetAddress class" + def __init__(self, host, addr): + self.host = host + self.addr = addr + def getAddress(self): + return socket.inet_aton(self.addr) + def getHostAddress(self): + return self.addr + def __eq__(self, other): + if not isinstance(other, InetAddress): return False + return self.addr == other.addr + def __str__(self): + return "%s/%s" % (self.host or "", self.addr) + +class InputPacket(object): + "Wrapper for UDP packet data." + def __init__(self, data, addr, port): + self.data = data + self.addr = addr + self.port = port + def getLength(self): + return len(self.data) + def getData(self): + return self.data + def getPort(self): + return self.port + def getAddress(self): + return self.addr + def __str__(self): + return "[address=%s,port=%d,data=\"%s\"]" % (self.getAddress(), self.getPort(), self.data) + +DISCOVEY_PORT = 1534 +MAX_PACKET_SIZE = 9000 - 40 - 8 +PREF_PACKET_SIZE = 1500 - 40 - 8 + +# TODO: research usage of DNS-SD (DNS Service Discovery) to discover TCF peers +class LocatorService(locator.LocatorService): + locator = None + peers = {} # str->Peer + listeners = [] # list of LocatorListener + error_log = set() # set of str + + addr_cache = {} # str->AddressCacheItem + addr_request = False + local_peer = None + last_master_packet_time = 0 + + @classmethod + def getLocalPeer(cls): + return cls.local_peer + + @classmethod + def getListeners(cls): + return cls.listeners[:] + + @classmethod + def startup(cls): + if cls.locator: + cls.locator._startup() + + @classmethod + def shutdown(cls): + if cls.locator: + cls.locator._shutdown() + + def __init__(self): + self._error_log_lock = threading.RLock() + self._alive = False + LocatorService.locator = self + LocatorService.local_peer = peer.LocalPeer() + + def _startup(self): + if self._alive: return + self._alive = True + self._addr_cache_lock = threading.Condition() + self.subnets = set() + self.slaves = [] + self.inp_buf = bytearray(MAX_PACKET_SIZE) + self.out_buf = bytearray(MAX_PACKET_SIZE) + service = self + class TimerThread(threading.Thread): + def __init__(self, callable): + self._callable = callable + super(TimerThread, self).__init__() + def run(self): + while service._alive: + try: + time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.) + protocol.invokeAndWait(self._callable) + except RuntimeError: + # TCF event dispatch is shut down + return + except Exception as x: + service._log("Unhandled exception in TCF discovery timer thread", x) + self.timer_thread = TimerThread(self.__refresh_timer) + class DNSLookupThread(threading.Thread): + def run(self): + while service._alive: + try: + itemSet = None + with service._addr_cache_lock: + if not LocatorService.addr_request: + service._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD) + msec = int(time.time() * 1000) + for host, a in LocatorService.addr_cache.items(): + if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec: + if a.used: + if itemSet is None: itemSet = set() + itemSet.add(a) + else: + del LocatorService.addr_cache[host] + LocatorService.addr_request = False + if itemSet is not None: + for a in itemSet: + addr = None + try: + addr = socket.gethostbyname(a.host) + except socket.gaierror: + pass + with service._addr_cache_lock: + if addr is None: + a.address = None + else: + a.address = InetAddress(a.host, addr) + a.time_stamp = msec + a.used = False + except Exception as x: + service._log("Unhandled exception in TCF discovery DNS lookup thread", x) + self.dns_lookup_thread = DNSLookupThread() + class InputThread(threading.Thread): + def __init__(self, callable): + self._callable = callable + super(InputThread, self).__init__() + def run(self): + try: + while service._alive: + sock = service.socket + try: + data, addr = sock.recvfrom(MAX_PACKET_SIZE) + p = InputPacket(data, InetAddress(None, addr[0]), addr[1]) + protocol.invokeAndWait(self._callable, p) + except RuntimeError: + # TCF event dispatch is shutdown + return + except socket.error as x: + if sock != service.socket: continue + # frequent error on windows, unknown reason + if x.errno == 10054: continue + port = sock.getsockname()[1] + service._log("Cannot read from datagram socket at port %d" % port, x) + time.sleep(2) + except Exception as x: + service._log("Unhandled exception in socket reading thread", x) + self.input_thread = InputThread(self.__handleDatagramPacket) + try: + self.loopback_addr = InetAddress(None, "127.0.0.1") + self.out_buf[0:8] = 'TCF%s\0\0\0\0' % locator.CONF_VERSION + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + self.socket.bind(('', DISCOVEY_PORT)) + if __TRACE_DISCOVERY__: + logging.trace("Became the master agent (bound to port %d)" % self.socket.getsockname()[1]) + except socket.error as x: + self.socket.bind(('', 0)) + if __TRACE_DISCOVERY__: + logging.trace("Became a slave agent (bound to port %d)" % self.socket.getsockname()[1]) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.input_thread.setName("TCF Locator Receiver") + self.timer_thread.setName("TCF Locator Timer") + self.dns_lookup_thread.setName("TCF Locator DNS Lookup") + self.input_thread.setDaemon(True) + self.timer_thread.setDaemon(True) + self.dns_lookup_thread.setDaemon(True) + self.input_thread.start() + self.timer_thread.start() + self.dns_lookup_thread.start() + class LocatorListener(locator.LocatorListener): + def peerAdded(self, peer): + service._sendPeerInfo(peer, None, 0) + def peerChanged(self, peer): + service._sendPeerInfo(peer, None, 0) + self.listeners.append(LocatorListener()) + self.__refreshSubNetList() + self.__sendPeersRequest(None, 0) + self.__sendAll(None, 0, None, int(time.time() * 1000)) + except Exception as x: + self._log("Cannot open UDP socket for TCF discovery protocol", x) + + def _shutdown(self): + if self._alive: + self._alive = False + + def __makeErrorReport(self, code, msg): + err = {} + err[errors.ERROR_TIME] = int(time.time() * 1000) + err[errors.ERROR_CODE] = code + err[errors.ERROR_FORMAT] = msg + return err + + def __command(self, channel, token, name, data): + try: + if name == "redirect": + peer_id = fromJSONSequence(data)[0] + _peer = self.peers.get(peer_id) + if _peer is None: + error = self.__makeErrorReport(errors.TCF_ERROR_UNKNOWN_PEER, "Unknown peer ID") + channel.sendResult(token, toJSONSequence((error,))) + return + channel.sendResult(token, toJSONSequence((None,))) + if isinstance(_peer, peer.LocalPeer): + channel.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((channel.getLocalServices(),))) + return + ChannelProxy(channel, _peer.openChannel()) + elif name == "sync": + channel.sendResult(token, None) + elif name == "getPeers": + arr = [] + for p in self.peers.values(): + arr.append(p.getAttributes()) + channel.sendResult(token, toJSONSequence((None, arr))) + else: + channel.rejectCommand(token) + except Exception as x: + channel.terminate(x) + + def _log(self, msg, x): + if not self._alive: return + # Don't report same error multiple times to avoid filling up the log file. + with self._error_log_lock: + if msg in self.error_log: return + self.error_log.add(msg) + protocol.log(msg, x) + + def __getInetAddress(self, host): + if not host: return None + with self._addr_cache_lock: + i = self.addr_cache.get(host) + if i is None: + i = AddressCacheItem(host) + ch = host[0] + if ch == '[' or ch == ':' or ch >= '0' and ch <= '9': + try: + addr = socket.gethostbyname(host) + i.address = InetAddress(host, addr) + except socket.gaierror: + pass + i.time_stamp = int(time.time() * 1000) + else: + # socket.gethostbyname() can cause long delay - delegate to background thread + LocatorService.addr_request = True + self._addr_cache_lock.notify() + self.addr_cache[host] = i + i.used = True + return i.address + + def __refresh_timer(self): + tm = int(time.time() * 1000) + # Cleanup slave table + if self.slaves: + i = 0 + while i < len(self.slaves): + s = self.slaves[i] + if s.last_packet_time + locator.DATA_RETENTION_PERIOD < tm: + del self.slaves[i] + else: + i += 1 + + # Cleanup peers table + stale_peers = None + for p in self.peers.values(): + if isinstance(p, peer.RemotePeer): + if p.getLastUpdateTime() + locator.DATA_RETENTION_PERIOD < tm: + if stale_peers == None: stale_peers = [] + stale_peers.append(p) + if stale_peers is not None: + for p in stale_peers: p.dispose() + + # Try to become a master + port = self.socket.getsockname()[1] + if port != DISCOVEY_PORT and \ + self.last_master_packet_time + locator.DATA_RETENTION_PERIOD / 2 <= tm: + s0 = self.socket + s1 = None + try: + s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s1.bind(DISCOVEY_PORT) + s1.setsockopt(socket.SOL_UDP, socket.SO_BROADCAST, 1) + self.socket = s1 + s0.close() + except: + pass + self.__refreshSubNetList() + if port != DISCOVEY_PORT: + for subnet in self.subnets: + self.__addSlave(subnet.address, port, tm, tm) + self.__sendAll(None, 0, None, tm) + + def __addSlave(self, addr, port, timestamp, time_now): + for s in self.slaves: + if s.port == port and s.address == addr: + if s.last_packet_time < timestamp: s.last_packet_time = timestamp + return s + s = Slave(addr, port) + s.last_packet_time = timestamp + self.slaves.append(s) + self.__sendPeersRequest(addr, port) + self.__sendAll(addr, port, s, time_now) + self.__sendSlaveInfo(s, time_now) + return s + + def __refreshSubNetList(self): + subNetSet = set() + try: + self.__getSubNetList(subNetSet) + except Exception as x: + self._log("Cannot get list of network interfaces", x) + for s in tuple(self.subnets): + if s in subNetSet: continue + self.subnets.remove(s) + for s in subNetSet: + if s in self.subnets: continue + self.subnets.add(s) + if __TRACE_DISCOVERY__: + str = cStringIO.StringIO() + str.write("Refreshed subnet list:") + for subnet in self.subnets: + str.write("\n\t* address=%s, broadcast=%s" % (subnet.address, subnet.broadcast)) + logging.trace(str.getvalue()) + + def __getSubNetList(self, set): + # TODO iterate over network interfaces to get proper broadcast addresses + hostname = socket.gethostname() + _, _, addresses = socket.gethostbyname_ex(hostname) + if not "127.0.0.1" in addresses: + addresses.append("127.0.0.1") + for address in addresses: + rawaddr = socket.inet_aton(address) + if len(rawaddr) != 4: continue + rawaddr = rawaddr[:3] + '\xFF' + broadcast = socket.inet_ntoa(rawaddr) + set.add(SubNet(24, InetAddress(hostname, address), InetAddress(None, broadcast))) + + def __getUTF8Bytes(self, s): + return s.encode("UTF-8") + + # Used for tracing + packetTypes = [ + None, + "CONF_REQ_INFO", + "CONF_PEER_INFO", + "CONF_REQ_SLAVES", + "CONF_SLAVES_INFO", + "CONF_PEERS_REMOVED" + ] + + def __sendDatagramPacket(self, subnet, size, addr, port): + try: + if addr is None: + addr = subnet.broadcast + port = DISCOVEY_PORT + for slave in self.slaves: + self.__sendDatagramPacket(subnet, size, slave.address, slave.port) + if not subnet.contains(addr): return False + if port == self.socket.getsockname()[1] and addr == subnet.address: return False + self.socket.sendto(str(self.out_buf[:size]), (addr.getHostAddress(), port)) + + if __TRACE_DISCOVERY__: + map = None + if self.out_buf[4] == locator.CONF_PEER_INFO: + map = self.__parsePeerAttributes(self.out_buf, 8) + elif self.out_buf[4] == locator.CONF_SLAVES_INFO: + map = self.__parseIDs(self.out_buf, size) + elif self.out_buf[4] == locator.CONF_PEERS_REMOVED: + map = self.__parseIDs(self.out_buf, size) + self.__traceDiscoveryPacket(False, self.packetTypes[self.out_buf[4]], map, addr, port) + except Exception as x: + self._log("Cannot send datagram packet to %s" % addr, x) + return False + return True + + def __parsePeerAttributes(self, data, size): + """ + Parse peer attributes in CONF_PEER_INFO packet data + + @param data - the packet data + @param size - the packet size + @return a map containing the attributes + """ + map = {} + s = data[8:size - 8].decode("UTF-8") + l = len(s) + i = 0 + while i < l: + i0 = i + while i < l and s[i] != '=' and s[i] != '\0': i += 1 + i1 = i + if i < l and s[i] == '=': i += 1 + i2 = i + while i < l and s[i] != '\0': i += 1 + i3 = i + if i < l and s[i] == '\0': i += 1 + key = s[i0:i1] + val = s[i2:i3] + map[key] = val + return map + + def __parseIDs(self, data, size): + """ + Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet data. + + @param data - the packet data + @param size - the packet size + @return a map containing the IDs + """ + cnt = 0 + map = {} + s = data[8:size - 8].decode("UTF-8") + l = len(s) + i = 0 + while i < l: + i0 = i + while i < l and s[i] != '\0': i += 1 + if i > i0: + id = s[i0:i] + map[str(cnt)] = id + cnt += 1 + while i < l and s[i] == '\0': i += 1 + return map + + def __sendPeersRequest(self, addr, port): + self.out_buf[4] = locator.CONF_REQ_INFO + for subnet in self.subnets: + self.__sendDatagramPacket(subnet, 8, addr, port) + + def _sendPeerInfo(self, _peer, addr, port): + attrs = _peer.getAttributes() + peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST)) + if peer_addr is None: return + if attrs.get(peer.ATTR_IP_PORT) is None: return + self.out_buf[4] = locator.CONF_PEER_INFO + i = 8 + + for subnet in self.subnets: + if isinstance(_peer, peer.RemotePeer): + if self.socket.getsockname()[1] != DISCOVEY_PORT: return + if not subnet.address == self.loopback_addr and not subnet.address == peer_addr: continue + if not subnet.address == self.loopback_addr: + if not subnet.contains(peer_addr): continue + if i == 8: + sb = cStringIO.StringIO() + for key in attrs.keys(): + sb.write(key) + sb.write('=') + sb.write(attrs.get(key)) + sb.write('\0') + bt = self.__getUTF8Bytes(sb.getvalue()) + if i + len(bt) > len(self.out_buf): return + self.out_buf[i:i+len(bt)] = bt + i += len(bt) + if self.__sendDatagramPacket(subnet, i, addr, port): subnet.send_all_ok = True + + def __sendEmptyPacket(self, addr, port): + self.out_buf[4] = locator.CONF_SLAVES_INFO + for subnet in self.subnets: + if subnet.send_all_ok: continue + self.__sendDatagramPacket(subnet, 8, addr, port) + + def __sendAll(self, addr, port, sl, tm): + for subnet in self.subnets: subnet.send_all_ok = False + for peer in self.peers.values(): self._sendPeerInfo(peer, addr, port) + if addr is not None and sl is not None and sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm: + self.__sendSlavesInfo(addr, port, tm) + self.__sendEmptyPacket(addr, port) + + def __sendSlavesRequest(self, subnet, addr, port): + self.out_buf[4] = locator.CONF_REQ_SLAVES + self.__sendDatagramPacket(subnet, 8, addr, port) + + def __sendSlaveInfo(self, x, tm): + ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm + if ttl <= 0: return + self.out_buf[4] = locator.CONF_SLAVES_INFO + for subnet in self.subnets: + if not subnet.contains(x.address): continue + i = 8 + s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress()) + bt = self.__getUTF8Bytes(s) + self.out_buf[i:i+len(bt)] = bt + i += len(bt) + self.out_buf[i] = 0 + i += 1 + for y in self.slaves: + if not subnet.contains(y.address): continue + if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: continue + self.__sendDatagramPacket(subnet, i, y.address, y.port) + + def __sendSlavesInfo(self, addr, port, tm): + self.out_buf[4] = locator.CONF_SLAVES_INFO + for subnet in self.subnets: + if not subnet.contains(addr): continue + i = 8 + for x in self.slaves: + ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm + if ttl <= 0: return + if x.port == port and x.address == addr: continue + if not subnet.address == self.loopback_addr: + if not subnet.contains(x.address): continue + subnet.send_all_ok = True + s = "%d:%s:%s" % (x.last_packet_time, x.port, x.address.getHostAddress()) + bt = self.__getUTF8Bytes(s) + if i > 8 and i + len(bt) >= PREF_PACKET_SIZE: + self.__sendDatagramPacket(subnet, i, addr, port) + i = 8 + self.out_buf[i:len(bt)] = bt + i += len(bt) + self.out_buf[i] = 0 + i += 1 + if i > 8: self.__sendDatagramPacket(subnet, i, addr, port) + + def __isRemote(self, address, port): + if port != self.socket.getsockname()[1]: return True + for s in self.subnets: + if s.address == address: return False + return True + + def __handleDatagramPacket(self, p): + try: + tm = int(time.time() * 1000) + buf = p.getData() + len = p.getLength() + if len < 8: return + if buf[0] != 'T': return + if buf[1] != 'C': return + if buf[2] != 'F': return + if buf[3] != locator.CONF_VERSION: return + remote_port = p.getPort() + remote_address = p.getAddress() + if self.__isRemote(remote_address, remote_port): + if buf[4] == locator.CONF_PEERS_REMOVED: + self.__handlePeerRemovedPacket(p) + else: + sl = None + if remote_port != DISCOVEY_PORT: + sl = self.__addSlave(remote_address, remote_port, tm, tm) + code = ord(buf[4]) + if code == locator.CONF_PEER_INFO: + self.__handlePeerInfoPacket(p) + elif code == locator.CONF_REQ_INFO: + self.__handleReqInfoPacket(p, sl, tm) + elif code == locator.CONF_SLAVES_INFO: + self.__handleSlavesInfoPacket(p, tm) + elif code == locator.CONF_REQ_SLAVES: + self.__handleReqSlavesPacket(p, sl, tm) + for subnet in self.subnets: + if not subnet.contains(remote_address): continue + delay = locator.DATA_RETENTION_PERIOD / 3 + if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32 + elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2 + if subnet.last_slaves_req_time + delay <= tm: + self.__sendSlavesRequest(subnet, remote_address, remote_port) + subnet.last_slaves_req_time = tm + if subnet.address == remote_address and remote_port == DISCOVEY_PORT: + self.last_master_packet_time = tm + except Exception as x: + self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + + def __handlePeerInfoPacket(self, p): + try: + map = self.__parsePeerAttributes(p.getData(), p.getLength()) + if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p) + id = map.get(peer.ATTR_ID) + if id is None: raise RuntimeError("Invalid peer info: no ID") + ok = True + host = map.get(peer.ATTR_IP_HOST) + if host is not None: + ok = False + peer_addr = self.__getInetAddress(host) + if peer_addr is not None: + for subnet in self.subnets: + if subnet.contains(peer_addr): + ok = True + break + if ok: + _peer = self.peers.get(id) + if isinstance(_peer, peer.RemotePeer): + _peer.updateAttributes(map) + elif _peer is None: + peer.RemotePeer(map) + except Exception as x: + self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + + def __handleReqInfoPacket(self, p, sl, tm): + if __TRACE_DISCOVERY__: + self.__traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p) + self.__sendAll(p.getAddress(), p.getPort(), sl, tm) + + def __handleSlavesInfoPacket(self, p, time_now): + try: + map = self.__parseIDs(p.getData(), p.getLength()) + if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", map, p) + for s in map.values(): + i = 0 + l = len(s) + time0 = i + while i < l and s[i] != ':' and s[i] != '\0': i += 1 + time1 = i + if i < l and s[i] == ':': i += 1 + port0 = i + while i < l and s[i] != ':' and s[i] != '\0': i += 1 + port1 = i + if i < l and s[i] == ':': i += 1 + host0 = i + while i < l and s[i] != '\0': i += 1 + host1 = i + port = int(s[port0:port1]) + timestamp = s[time0:time1] + host = s[host0:host1] + if port != DISCOVEY_PORT: + addr = self.__getInetAddress(host) + if addr is not None: + delta = 10006030 # 30 minutes + if len(timestamp) > 0: + time_val = int(timestamp) + else: + time_val = time_now + if time_val < 3600000: + # Time stamp is "time to live" in milliseconds + time_val = time_now + time_val / 1000 - locator.DATA_RETENTION_PERIOD + elif time_val < time_now / 1000 + 50000000: + # Time stamp is in seconds + time_val= 1000 + else: + # Time stamp is in milliseconds + pass + if time_val < time_now - delta or time_val > time_now + delta: + msg = "Invalid slave info timestamp: %s -> %s" % ( + timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val/1000.))) + self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), Exception(msg)) + time_val = time_now - locator.DATA_RETENTION_PERIOD / 2 + self.__addSlave(addr, port, time_val, time_now) + except Exception as x: + self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + + def __handleReqSlavesPacket(self, p, sl, tm): + if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p) + if sl is not None: sl.last_req_slaves_time = tm + self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm) + + def __handlePeerRemovedPacket(self, p): + try: + map = self.__parseIDs(p.getData(), p.getLength()) + if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", map, p) + for id in map.values(): + _peer = self.peers.get(id) + if isinstance(_peer, peer.RemotePeer): _peer.dispose() + except Exception as x: + self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + + + @classmethod + def getLocator(cls): + return cls.locator + + def getPeers(self): + assert protocol.isDispatchThread() + return self.peers + + def redirect(self, peer, done): + raise RuntimeError("Channel redirect cannot be done on local peer") + + def sync(self, done): + raise RuntimeError("Channel sync cannot be done on local peer") + + def addListener(self, listener): + assert listener is not None + assert protocol.isDispatchThread() + self.listeners.append(listener) + + def removeListener(self, listener): + assert protocol.isDispatchThread() + self.listeners.remove(listener) + + @classmethod + def __traceDiscoveryPacket(cls, received, type, attrs, addr, port=None): + """ + Log that a TCF Discovery packet has be sent or received. The trace is + sent to stdout. This should be called only if the tracing has been turned + on. + + @param received + True if the packet was sent, otherwise it was received + @param type + a string specifying the type of packet, e.g., "CONF_PEER_INFO" + @param attrs + a set of attributes relevant to the type of packet (typically + a peer's attributes) + @param addr + the network address the packet is being sent to + @param port + the port the packet is being sent to + """ + assert __TRACE_DISCOVERY__ + if port is None: + # addr is a InputPacket + port = addr.getPort() + addr = addr.getAddress() + str = cStringIO.StringIO() + str.write(type) + str.write((" sent to ", " received from ")[received]) + str.write("%s/%s" % (addr, port)) + if attrs is not None: + for key, value in attrs.items(): + str.write("\n\t%s=%s" % (key, value)) + logging.trace(str.getvalue()) + +class LocatorServiceProvider(services.ServiceProvider): + def getLocalService(self, _channel): + class CommandServer(channel.CommandServer): + def command(self, token, name, data): + LocatorService.locator.command(channel, token, name, data) + _channel.addCommandServer(LocatorService.locator, CommandServer()) + return (LocatorService.locator,) + +services.addServiceProvider(LocatorServiceProvider()) |