From d42dd34d8867dbb6a5518ea0546c4d7eba972d65 Mon Sep 17 00:00:00 2001 From: aleherbau Date: Wed, 11 May 2011 08:07:46 +0000 Subject: TCF Python: Fixed syntax errors in LocatorService (work in progress) --- python/src/tcf/services/local/LocatorService.py | 650 ++++++++++-------------- 1 file changed, 275 insertions(+), 375 deletions(-) (limited to 'python') diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py index 3093f3793..c9dbaf230 100644 --- a/python/src/tcf/services/local/LocatorService.py +++ b/python/src/tcf/services/local/LocatorService.py @@ -20,7 +20,7 @@ from tcf.services import locator from tcf.util import logging from tcf.channel import fromJSONSequence, toJSONSequence from tcf.channel.ChannelProxy import ChannelProxy -from tcf import protocol, peer, errors +from tcf import protocol, services, channel, peer, errors # Flag indicating whether tracing of the the discovery activity is enabled. __TRACE_DISCOVERY = False @@ -30,7 +30,6 @@ class SubNet(object): self.prefix_length = prefix_length self.address = address self.broadcast = broadcast - def contains(self, addr): if addr is None or self.address is None: return False a1 = addr.getAddress() @@ -51,30 +50,25 @@ class SubNet(object): if (a1[n] & m) != (a2[n] & m): return False i += 1 return True - def __eq__(self, o): if not isinstance(o, SubNet): return False return self.prefix_length == o.prefix_length and \ self.broadcast == o.broadcast and \ self.address == o.address - def __hash__(self): return hash(self.address) - def __str__(self): return "%s/%d" % (self.address.getHostAddress(), self.prefix_length) class Slave(object): # Time of last packet receiver from self slave last_packet_time = 0 - #Time of last REQ_SLAVES packet received from self slave last_req_slaves_time = 0 def __init__(self, address, port): self.address = address self.port = port - def __str__(self): return "%s/%d" % (self.address.getHostAddress(), self.port) @@ -92,28 +86,23 @@ class InetAddress(object): self.addr = addr def getHostAddress(self): return self.addr + def __eq__(self, other): + if not isinstance(other, InetAddress): return False + return self.host == other.host and self.addr == other.addr class InputPacket(object): - """ - Wrapper for final class DatagramPacket so its toString() can present - the value in the debugger in a readable fashion. - """ + "Wrapper for UDP packet data." def __init__(self, data, addr): self.data = data self.addr = addr - def getLength(self): return len(self.data) - def getData(self): return self.data - def getPort(self): return self.addr[1] - def getAddress(self): return self.addr[0] - def __str__(self): return "[address=%s,port=%d,data=\"%s\"]" % (self.getAddress(), self.getPort(), self.data) @@ -293,7 +282,7 @@ class LocatorService(locator.LocatorService): self.error_log.add(msg) protocol.log(msg, x) - def _getInetAddress(self, host): + def __getInetAddress(self, host): if not host: return None with self._addr_cache_lock: i = self.addr_cache.get(host) @@ -469,374 +458,285 @@ class LocatorService(locator.LocatorService): peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST)) if peer_addr is None: return if attrs.get(peer.ATTR_IP_PORT) is None: return - self.out_buf[4] = CONF_PEER_INFO + self.out_buf[4] = locator.CONF_PEER_INFO i = 8 for subnet in self.subnets: if isinstance(_peer, peer.RemotePeer): if self.socket.getsockname()[1] != DISCOVEY_PORT: return - if (!subnet.address.equals(loopback_addr) && !subnet.address.equals(peer_addr)) continue - } - if (!subnet.address.equals(loopback_addr)) { - if (!subnet.contains(peer_addr)) continue - } - if (i == 8) { - StringBuffer sb = new StringBuffer(out_buf.length) - for (str key : attrs.keySet()) { - sb.append(key) - sb.append('=') - sb.append(attrs.get(key)) - sb.append((char)0) - } - byte[] bt = getUTF8Bytes(sb.toString()) - if (i + bt.length > out_buf.length) return - System.arraycopy(bt, 0, out_buf, i, bt.length) - i += bt.length - } - if (sendDatagramPacket(subnet, i, addr, port)) subnet.send_all_ok = True - } - } - - private void sendEmptyPacket(InetAddress addr, int port) { - out_buf[4] = CONF_SLAVES_INFO - for (SubNet subnet : subnets) { - if (subnet.send_all_ok) continue - sendDatagramPacket(subnet, 8, addr, port) - } - } - - private void sendAll(InetAddress addr, int port, Slave sl, long time) { - for (SubNet subnet : subnets) subnet.send_all_ok = False - for (IPeer peer : peers.values()) sendPeerInfo(peer, addr, port) - if (addr != None && sl != None && sl.last_req_slaves_time + DATA_RETENTION_PERIOD >= time) { - sendSlavesInfo(addr, port, time) - } - sendEmptyPacket(addr, port) - } - - private void sendSlavesRequest(SubNet subnet, InetAddress addr, int port) { - out_buf[4] = CONF_REQ_SLAVES - sendDatagramPacket(subnet, 8, addr, port) - } - - private void sendSlaveInfo(Slave x, long time) { - int ttl = (int)(x.last_packet_time + DATA_RETENTION_PERIOD - time) - if (ttl <= 0) return - out_buf[4] = CONF_SLAVES_INFO - for (SubNet subnet : subnets) { - if (!subnet.contains(x.address)) continue - int i = 8 - str s = ttl + ":" + x.port + ":" + x.address.getHostAddress() - byte[] bt = getUTF8Bytes(s) - System.arraycopy(bt, 0, out_buf, i, bt.length) - i += bt.length - out_buf[i++] = 0 - for (Slave y : slaves) { - if (!subnet.contains(y.address)) continue - if (y.last_req_slaves_time + DATA_RETENTION_PERIOD < time) continue - sendDatagramPacket(subnet, i, y.address, y.port) - } - } - } - - private void sendSlavesInfo(InetAddress addr, int port, long time) { - out_buf[4] = CONF_SLAVES_INFO - for (SubNet subnet : subnets) { - if (!subnet.contains(addr)) continue - int i = 8 - for (Slave x : slaves) { - int ttl = (int)(x.last_packet_time + DATA_RETENTION_PERIOD - time) - if (ttl <= 0) continue - if (x.port == port && x.address.equals(addr)) continue - if (!subnet.address.equals(loopback_addr)) { - if (!subnet.contains(x.address)) continue - } + if not subnet.address == self.loopback_addr and not subnet.address == peer_addr: continue + if not subnet.address == self.loopback_addr: + if not subnet.contains(peer_addr): continue + if i == 8: + sb = cStringIO.StringIO() + for key in attrs.keys(): + sb.write(key) + sb.write('=') + sb.write(attrs.get(key)) + sb.write('\0') + bt = self.__getUTF8Bytes(sb.getvalue()) + if i + len(bt) > len(self.out_buf): return + self.out_buf[i:i+len(bt)] = bt + i += len(bt) + if self.__sendDatagramPacket(subnet, i, addr, port): subnet.send_all_ok = True + + def __sendEmptyPacket(self, addr, port): + self.out_buf[4] = locator.CONF_SLAVES_INFO + for subnet in self.subnets: + if subnet.send_all_ok: continue + self.__sendDatagramPacket(subnet, 8, addr, port) + + def __sendAll(self, addr, port, sl, tm): + for subnet in self.subnets: subnet.send_all_ok = False + for peer in self.peers.values(): self.__sendPeerInfo(peer, addr, port) + if addr is not None and sl is not None and sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm: + self.__sendSlavesInfo(addr, port, tm) + self.__sendEmptyPacket(addr, port) + + def __sendSlavesRequest(self, subnet, addr, port): + self.out_buf[4] = locator.CONF_REQ_SLAVES + self.__sendDatagramPacket(subnet, 8, addr, port) + + def __sendSlaveInfo(self, x, tm): + ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm + if ttl <= 0: return + self.out_buf[4] = locator.CONF_SLAVES_INFO + for subnet in self.subnets: + if not subnet.contains(x.address): continue + i = 8 + s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress()) + bt = self.__getUTF8Bytes(s) + self.out_buf[i:i+len(bt)] = bt + i += len(bt) + self.out_buf[i] = 0 + i += 1 + for y in self.slaves: + if not subnet.contains(y.address): continue + if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: continue + self.__sendDatagramPacket(subnet, i, y.address, y.port) + + def __sendSlavesInfo(self, addr, port, tm): + self.out_buf[4] = locator.CONF_SLAVES_INFO + for subnet in self.subnets: + if not subnet.contains(addr): continue + i = 8 + for x in self.slaves: + ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm + if ttl <= 0: return + if x.port == port and x.address == addr: continue + if not subnet.address == self.loopback_addr: + if not subnet.contains(x.address): continue subnet.send_all_ok = True - str s = x.last_packet_time + ":" + x.port + ":" + x.address.getHostAddress() - byte[] bt = getUTF8Bytes(s) - if (i > 8 && i + bt.length >= PREF_PACKET_SIZE) { - sendDatagramPacket(subnet, i, addr, port) + s = "%d:%s:%s" % (x.last_packet_time, x.port, x.address.getHostAddress()) + bt = self.__getUTF8Bytes(s) + if i > 8 and i + len(bt) >= PREF_PACKET_SIZE: + self.__sendDatagramPacket(subnet, i, addr, port) i = 8 - } - System.arraycopy(bt, 0, out_buf, i, bt.length) - i += bt.length - out_buf[i++] = 0 - } - if (i > 8) sendDatagramPacket(subnet, i, addr, port) - } - } - - private boolean isRemote(InetAddress address, int port) { - if (port != socket.getLocalPort()) return True - for (SubNet s : subnets) { - if (s.address.equals(address)) return False - } + self.out_buf[i:len(bt)] = bt + i += len(bt) + self.out_buf[i] = 0 + i += 1 + if i > 8: self.__sendDatagramPacket(subnet, i, addr, port) + + def __isRemote(self, address, port): + if port != self.socket.getsockname()[1]: return True + for s in self.subnets: + if s.address == address: return False return True - } - - private void handleDatagramPacket(InputPacket p) { - try { - long time = System.currentTimeMillis() - byte[] buf = p.getData() - int len = p.getLength() - if (len < 8) return - if (buf[0] != 'T') return - if (buf[1] != 'C') return - if (buf[2] != 'F') return - if (buf[3] != CONF_VERSION) return - int remote_port = p.getPort() - InetAddress remote_address = p.getAddress() - if (isRemote(remote_address, remote_port)) { - Slave sl = None - if (remote_port != DISCOVEY_PORT) { - sl = addSlave(remote_address, remote_port, time, time) - } - switch (buf[4]) { - case CONF_PEER_INFO: - handlePeerInfoPacket(p) - break - case CONF_REQ_INFO: - handleReqInfoPacket(p, sl, time) - break - case CONF_SLAVES_INFO: - handleSlavesInfoPacket(p, time) - break - case CONF_REQ_SLAVES: - handleReqSlavesPacket(p, sl, time) - break - } - for (SubNet subnet : subnets) { - if (!subnet.contains(remote_address)) continue - long delay = DATA_RETENTION_PERIOD / 3 - if (remote_port != DISCOVEY_PORT) delay = DATA_RETENTION_PERIOD / 32 - else if (!subnet.address.equals(remote_address)) delay = DATA_RETENTION_PERIOD / 2 - if (subnet.last_slaves_req_time + delay <= time) { - sendSlavesRequest(subnet, remote_address, remote_port) - subnet.last_slaves_req_time = time - } - if (subnet.address.equals(remote_address) && remote_port == DISCOVEY_PORT) { - last_master_packet_time = time - } - } - } - } - catch (Throwable x) { - log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x) - } - } - - private void handlePeerInfoPacket(InputPacket p) { - try { - Map map = parsePeerAtrributes(p.getData(), p.getLength()) - if (TRACE_DISCOVERY) traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p) - str id = map.get(IPeer.ATTR_ID) - if (id == None) throw new Exception("Invalid peer info: no ID") - boolean ok = True - str host = map.get(IPeer.ATTR_IP_HOST) - if (host != None) { + + def __handleDatagramPacket(self, p): + try: + tm = int(time.time()) + buf = p.getData() + len = p.getLength() + if len < 8: return + if buf[0] != 'T': return + if buf[1] != 'C': return + if buf[2] != 'F': return + if buf[3] != locator.CONF_VERSION: return + remote_port = p.getPort() + remote_address = p.getAddress() + if self.__isRemote(remote_address, remote_port): + sl = None + if remote_port != DISCOVEY_PORT: + sl = self.__addSlave(remote_address, remote_port, time, time) + code = buf[4] + if code == locator.CONF_PEER_INFO: + self.__handlePeerInfoPacket(p) + elif code == locator.CONF_REQ_INFO: + self.__handleReqInfoPacket(p, sl, time) + elif code == locator.CONF_SLAVES_INFO: + self.__handleSlavesInfoPacket(p, time) + elif code == locator.CONF_REQ_SLAVES: + self.__handleReqSlavesPacket(p, sl, time) + for subnet in self.subnets: + if not subnet.contains(remote_address): continue + delay = locator.DATA_RETENTION_PERIOD / 3 + if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32 + elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2 + if subnet.last_slaves_req_time + delay <= tm: + self.__sendSlavesRequest(subnet, remote_address, remote_port) + subnet.last_slaves_req_time = tm + if subnet.address == remote_address and remote_port == DISCOVEY_PORT: + self.last_master_packet_time = time + except exceptions.BaseException as x: + self.log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x) + + def __handlePeerInfoPacket(self, p): + try: + map = self.__parsePeerAtrributes(p.getData(), p.getLength()) + if __TRACE_DISCOVERY: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p) + id = map.get(peer.ATTR_ID) + if id is None: raise exceptions.RuntimeError("Invalid peer info: no ID") + ok = True + host = map.get(peer.ATTR_IP_HOST) + if host is not None: ok = False - InetAddress peer_addr = getInetAddress(host) - if (peer_addr != None) { - for (SubNet subnet : subnets) { - if (subnet.contains(peer_addr)) { + peer_addr = self.__getInetAddress(host) + if peer_addr is not None: + for subnet in self.subnets: + if subnet.contains(peer_addr): ok = True break - } - } - } - } - if (ok) { - IPeer peer = peers.get(id) - if (peer instanceof RemotePeer) { - ((RemotePeer)peer).updateAttributes(map) - } - else if (peer == None) { - new RemotePeer(map) - } - } - } - catch (Exception x) { - log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x) - } - } - - private void handleReqInfoPacket(InputPacket p, Slave sl, long time) { - if (TRACE_DISCOVERY) { - traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p) - } - sendAll(p.getAddress(), p.getPort(), sl, time) - } - - private void handleSlavesInfoPacket(InputPacket p, long time_now) { - try { - Map trace_map = None # used for tracing only - int slave_index = 0 # used for tracing only - if (TRACE_DISCOVERY) { - trace_map = new HashMap(3) - } - - str s = new str(p.getData(), 8, p.getLength() - 8, "UTF-8") - int l = s.length() - int i = 0 - while (i < l) { - int time0 = i - while (i < l&& s.charAt(i) != ':' && s.charAt(i) != 0) i++ - int time1 = i - if (i < l && s.charAt(i) == ':') i++ - int port0 = i - while (i < l&& s.charAt(i) != ':' && s.charAt(i) != 0) i++ - int port1 = i - if (i < l && s.charAt(i) == ':') i++ - int host0 = i - while (i < l && s.charAt(i) != 0) i++ - int host1 = i - if (i < l && s.charAt(i) == 0) i++ - int port = Integer.parseInt(s.substring(port0, port1)) - str timestamp = s.substring(time0, time1) - str host = s.substring(host0, host1) - if (TRACE_DISCOVERY) { - trace_map.put("slave[" + slave_index++ + ']', timestamp + ':' + port + ':' + host) - } - if (port != DISCOVEY_PORT) { - InetAddress addr = getInetAddress(host) - if (addr != None) { - long delta = 10006030 # 30 minutes - long time_val = timestamp.length() > 0 ? Long.parseLong(timestamp) : time_now - if (time_val < 3600000) { - """Time stamp is "time to live" in milliseconds""" - time_val = time_now + time_val / 1000 - DATA_RETENTION_PERIOD - } - else if (time_val < time_now / 1000 + 50000000) { - """Time stamp is in seconds""" + if ok: + _peer = self.peers.get(id) + if isinstance(_peer, peer.RemotePeer): + _peer.updateAttributes(map) + elif peer is None: + peer.RemotePeer(map) + except exceptions.BaseException as x: + self.log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + + def __handleReqInfoPacket(self, p, sl, tm): + if __TRACE_DISCOVERY: + self.__traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p) + self.__sendAll(p.getAddress(), p.getPort(), sl, tm) + + def __handleSlavesInfoPacket(self, p, time_now): + try: + trace_map = None # used for tracing only + slave_index = 0 # used for tracing only + if __TRACE_DISCOVERY: + trace_map = {} + + s = p.getData()[8:p.getLength() - 8].decode("UTF-8") + l = len(s) + i = 0 + while i < l: + time0 = i + while i < l and s[i] != ':' and s[i] != '\0': i += 1 + time1 = i + if i < l and s[i] == ':': i += 1 + port0 = i + while i < l and s[i] != ':' and s[i] != '\0': i += 1 + port1 = i + if i < l and s[i] == ':': i += 1 + host0 = i + while i < l and s[i] != '\0': i += 1 + host1 = i + if i < l and s[i] == '\0': i += 1 + port = int(s[port0:port1]) + timestamp = s[time0:time1] + host = s[host0:host1] + if __TRACE_DISCOVERY: + trace_map.put("slave[%d]" % slave_index, '%s:%d:%s' % (timestamp, port, host)) + slave_index += 1 + if port != DISCOVEY_PORT: + addr = self.__getInetAddress(host) + if addr is not None: + delta = 10006030 # 30 minutes + if len(timestamp) > 0: + time_val = int(timestamp) + else: + time_val = time_now + if time_val < 3600000: + # Time stamp is "time to live" in milliseconds + time_val = time_now + time_val / 1000 - locator.DATA_RETENTION_PERIOD + elif time_val < time_now / 1000 + 50000000: + # Time stamp is in seconds time_val= 1000 - } - else { - """Time stamp is in milliseconds""" - } - if (time_val < time_now - delta || time_val > time_now + delta) { - SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") - str msg = - "Invalid slave info timestamp: " + timestamp + - " -> " + fmt.format(new Date(time_val)) - log("Invalid datagram packet received from " + - p.getAddress() + "/" + p.getPort(), - new Exception(msg)) - time_val = time_now - DATA_RETENTION_PERIOD / 2 - } - addSlave(addr, port, time_val, time_now) - } - } - } - if (TRACE_DISCOVERY) { - traceDiscoveryPacket(True, "CONF_SLAVES_INFO", trace_map, p) - } - } - catch (Exception x) { - log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x) - } - } - - private void handleReqSlavesPacket(InputPacket p, Slave sl, long time) { - if (TRACE_DISCOVERY) { - traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p) - } - if (sl != None) sl.last_req_slaves_time = time - sendSlavesInfo(p.getAddress(), p.getPort(), time) - } - - """----------------------------------------------------------------------------------""" - - public static LocatorService getLocator() { - return locator - } - - public str getName() { - return NAME - } - - public Map getPeers() { - assert Protocol.isDispatchThread() - return peers - } - - public IToken redirect(str peer_id, DoneRedirect done) { - throw new Error("Channel redirect cannot be done on local peer") - } - - public IToken redirect(Map peer, DoneRedirect done) { - throw new Error("Channel redirect cannot be done on local peer") - } - - public IToken sync(DoneSync done) { - throw new Error("Channel sync cannot be done on local peer") - } - - public void addListener(LocatorListener listener) { - assert listener != None - assert Protocol.isDispatchThread() - listeners.add(listener) - } - - public void removeListener(LocatorListener listener) { - assert Protocol.isDispatchThread() - listeners.remove(listener) - } - - """ - Log that a TCF Discovery packet has be sent or received. The trace is - sent to stdout. This should be called only if the tracing has been turned - on via java property definitions. - - @param received - True if the packet was sent, otherwise it was received - @param type - a string specifying the type of packet, e.g., "CONF_PEER_INFO" - @param attrs - a set of attributes relevant to the type of packet (typically - a peer's attributes) - @param addr - the network address the packet is being sent to - @param port - the port the packet is being sent to - """ - private static void traceDiscoveryPacket(boolean received, str type, Map attrs, InetAddress addr, int port) { - assert TRACE_DISCOVERY - StringBuilder str = new StringBuilder(type + (received ? " received from " : " sent to ") + addr + "/" + port) - if (attrs != None) { - Iterator> iter = attrs.entrySet().iterator() - while (iter.hasNext()) { - Entry entry = iter.next() - str.append("\n\t" + entry.getKey() + '=' + entry.getValue()) - } - } - logging.trace(str.toString()) - } - - """ - Convenience variant that takes a DatagramPacket for specifying - the target address and port. - """ - private static void traceDiscoveryPacket(boolean received, str type, Map attrs, InputPacket packet) { - traceDiscoveryPacket(received, type, attrs, packet.getAddress(), packet.getPort()) - } -} - - -static { - ServiceManager.addServiceProvider(new IServiceProvider() { - - public IService[] getLocalService(final IChannel channel) { - channel.addCommandServer(locator, new IChannel.ICommandServer() { - public void command(IToken token, str name, byte[] data) { - locator.command((AbstractChannel)channel, token, name, data) - } - }) - return new IService[]{ locator } - } - - public IService getServiceProxy(IChannel channel, str service_name) { - return None - } - }) -} + else: + # Time stamp is in milliseconds + pass + if time_val < time_now - delta or time_val > time_now + delta: + msg = "Invalid slave info timestamp: %s -> %s" % ( + timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val/1000.))) + self.log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), exceptions.Exception(msg)) + time_val = time_now - locator.DATA_RETENTION_PERIOD / 2 + self.__addSlave(addr, port, time_val, time_now) + if __TRACE_DISCOVERY: + self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", trace_map, p) + except exceptions.BaseException as x: + self.log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x) + + def __handleReqSlavesPacket(self, p, sl, tm): + if __TRACE_DISCOVERY: + self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p) + if sl is not None: sl.last_req_slaves_time = tm + self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm) + + @classmethod + def getLocator(cls): + return cls.locator + + def getPeers(self): + assert protocol.isDispatchThread() + return self.peers + + def redirect(self, peer, done): + raise RuntimeError("Channel redirect cannot be done on local peer") + + def sync(self, done): + raise RuntimeError("Channel sync cannot be done on local peer") + def addListener(self, listener): + assert listener is not None + assert protocol.isDispatchThread() + self.listeners.add(listener) + + def removeListener(self, listener): + assert protocol.isDispatchThread() + self.listeners.remove(listener) + + @classmethod + def __traceDiscoveryPacket(cls, received, type, attrs, addr, port=None): + """ + Log that a TCF Discovery packet has be sent or received. The trace is + sent to stdout. This should be called only if the tracing has been turned + on. + + @param received + True if the packet was sent, otherwise it was received + @param type + a string specifying the type of packet, e.g., "CONF_PEER_INFO" + @param attrs + a set of attributes relevant to the type of packet (typically + a peer's attributes) + @param addr + the network address the packet is being sent to + @param port + the port the packet is being sent to + """ + assert __TRACE_DISCOVERY + if port is None: + # addr is a InputPacket + addr = addr.getAddress() + port = addr.getPort() + str = cStringIO.StringIO() + str.write(type) + str.write((" sent to ", " received from ")[received]) + str.write("%s/%s" % (addr, port)) + if attrs is not None: + for key, value in attrs.items(): + str.write("\n\t%s=%s" % (key, value)) + logging.trace(str.getvalue()) + +class LocatorServiceProvider(services.ServiceProvider): + def getLocalService(self, _channel): + class CommandServer(channel.CommandServer): + def command(self, token, name, data): + LocatorService.locator.command(channel, token, name, data) + _channel.addCommandServer(LocatorService.locator, CommandServer()) + return (LocatorService.locator,) + +services.addServiceProvider(LocatorServiceProvider()) -- cgit v1.2.3