Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleherbau2011-05-11 04:07:46 -0400
committeraleherbau2011-05-11 04:07:46 -0400
commitd42dd34d8867dbb6a5518ea0546c4d7eba972d65 (patch)
treeb4a8954adc324730ed00c579f867068a3713eee2
parent996c136001830942d34c7ae184eec5d2fbf85ac4 (diff)
downloadorg.eclipse.tcf-d42dd34d8867dbb6a5518ea0546c4d7eba972d65.tar.gz
org.eclipse.tcf-d42dd34d8867dbb6a5518ea0546c4d7eba972d65.tar.xz
org.eclipse.tcf-d42dd34d8867dbb6a5518ea0546c4d7eba972d65.zip
TCF Python: Fixed syntax errors in LocatorService (work in progress)
-rw-r--r--python/src/tcf/services/local/LocatorService.py650
1 files changed, 275 insertions, 375 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index 3093f3793..c9dbaf230 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -20,7 +20,7 @@ from tcf.services import locator
from tcf.util import logging
from tcf.channel import fromJSONSequence, toJSONSequence
from tcf.channel.ChannelProxy import ChannelProxy
-from tcf import protocol, peer, errors
+from tcf import protocol, services, channel, peer, errors
# Flag indicating whether tracing of the the discovery activity is enabled.
__TRACE_DISCOVERY = False
@@ -30,7 +30,6 @@ class SubNet(object):
self.prefix_length = prefix_length
self.address = address
self.broadcast = broadcast
-
def contains(self, addr):
if addr is None or self.address is None: return False
a1 = addr.getAddress()
@@ -51,30 +50,25 @@ class SubNet(object):
if (a1[n] & m) != (a2[n] & m): return False
i += 1
return True
-
def __eq__(self, o):
if not isinstance(o, SubNet): return False
return self.prefix_length == o.prefix_length and \
self.broadcast == o.broadcast and \
self.address == o.address
-
def __hash__(self):
return hash(self.address)
-
def __str__(self):
return "%s/%d" % (self.address.getHostAddress(), self.prefix_length)
class Slave(object):
# Time of last packet receiver from self slave
last_packet_time = 0
-
#Time of last REQ_SLAVES packet received from self slave
last_req_slaves_time = 0
def __init__(self, address, port):
self.address = address
self.port = port
-
def __str__(self):
return "%s/%d" % (self.address.getHostAddress(), self.port)
@@ -92,28 +86,23 @@ class InetAddress(object):
self.addr = addr
def getHostAddress(self):
return self.addr
+ def __eq__(self, other):
+ if not isinstance(other, InetAddress): return False
+ return self.host == other.host and self.addr == other.addr
class InputPacket(object):
- """
- Wrapper for final class DatagramPacket so its toString() can present
- the value in the debugger in a readable fashion.
- """
+ "Wrapper for UDP packet data."
def __init__(self, data, addr):
self.data = data
self.addr = addr
-
def getLength(self):
return len(self.data)
-
def getData(self):
return self.data
-
def getPort(self):
return self.addr[1]
-
def getAddress(self):
return self.addr[0]
-
def __str__(self):
return "[address=%s,port=%d,data=\"%s\"]" % (self.getAddress(), self.getPort(), self.data)
@@ -293,7 +282,7 @@ class LocatorService(locator.LocatorService):
self.error_log.add(msg)
protocol.log(msg, x)
- def _getInetAddress(self, host):
+ def __getInetAddress(self, host):
if not host: return None
with self._addr_cache_lock:
i = self.addr_cache.get(host)
@@ -469,374 +458,285 @@ class LocatorService(locator.LocatorService):
peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST))
if peer_addr is None: return
if attrs.get(peer.ATTR_IP_PORT) is None: return
- self.out_buf[4] = CONF_PEER_INFO
+ self.out_buf[4] = locator.CONF_PEER_INFO
i = 8
for subnet in self.subnets:
if isinstance(_peer, peer.RemotePeer):
if self.socket.getsockname()[1] != DISCOVEY_PORT: return
- if (!subnet.address.equals(loopback_addr) && !subnet.address.equals(peer_addr)) continue
- }
- if (!subnet.address.equals(loopback_addr)) {
- if (!subnet.contains(peer_addr)) continue
- }
- if (i == 8) {
- StringBuffer sb = new StringBuffer(out_buf.length)
- for (str key : attrs.keySet()) {
- sb.append(key)
- sb.append('=')
- sb.append(attrs.get(key))
- sb.append((char)0)
- }
- byte[] bt = getUTF8Bytes(sb.toString())
- if (i + bt.length > out_buf.length) return
- System.arraycopy(bt, 0, out_buf, i, bt.length)
- i += bt.length
- }
- if (sendDatagramPacket(subnet, i, addr, port)) subnet.send_all_ok = True
- }
- }
-
- private void sendEmptyPacket(InetAddress addr, int port) {
- out_buf[4] = CONF_SLAVES_INFO
- for (SubNet subnet : subnets) {
- if (subnet.send_all_ok) continue
- sendDatagramPacket(subnet, 8, addr, port)
- }
- }
-
- private void sendAll(InetAddress addr, int port, Slave sl, long time) {
- for (SubNet subnet : subnets) subnet.send_all_ok = False
- for (IPeer peer : peers.values()) sendPeerInfo(peer, addr, port)
- if (addr != None && sl != None && sl.last_req_slaves_time + DATA_RETENTION_PERIOD >= time) {
- sendSlavesInfo(addr, port, time)
- }
- sendEmptyPacket(addr, port)
- }
-
- private void sendSlavesRequest(SubNet subnet, InetAddress addr, int port) {
- out_buf[4] = CONF_REQ_SLAVES
- sendDatagramPacket(subnet, 8, addr, port)
- }
-
- private void sendSlaveInfo(Slave x, long time) {
- int ttl = (int)(x.last_packet_time + DATA_RETENTION_PERIOD - time)
- if (ttl <= 0) return
- out_buf[4] = CONF_SLAVES_INFO
- for (SubNet subnet : subnets) {
- if (!subnet.contains(x.address)) continue
- int i = 8
- str s = ttl + ":" + x.port + ":" + x.address.getHostAddress()
- byte[] bt = getUTF8Bytes(s)
- System.arraycopy(bt, 0, out_buf, i, bt.length)
- i += bt.length
- out_buf[i++] = 0
- for (Slave y : slaves) {
- if (!subnet.contains(y.address)) continue
- if (y.last_req_slaves_time + DATA_RETENTION_PERIOD < time) continue
- sendDatagramPacket(subnet, i, y.address, y.port)
- }
- }
- }
-
- private void sendSlavesInfo(InetAddress addr, int port, long time) {
- out_buf[4] = CONF_SLAVES_INFO
- for (SubNet subnet : subnets) {
- if (!subnet.contains(addr)) continue
- int i = 8
- for (Slave x : slaves) {
- int ttl = (int)(x.last_packet_time + DATA_RETENTION_PERIOD - time)
- if (ttl <= 0) continue
- if (x.port == port && x.address.equals(addr)) continue
- if (!subnet.address.equals(loopback_addr)) {
- if (!subnet.contains(x.address)) continue
- }
+ if not subnet.address == self.loopback_addr and not subnet.address == peer_addr: continue
+ if not subnet.address == self.loopback_addr:
+ if not subnet.contains(peer_addr): continue
+ if i == 8:
+ sb = cStringIO.StringIO()
+ for key in attrs.keys():
+ sb.write(key)
+ sb.write('=')
+ sb.write(attrs.get(key))
+ sb.write('\0')
+ bt = self.__getUTF8Bytes(sb.getvalue())
+ if i + len(bt) > len(self.out_buf): return
+ self.out_buf[i:i+len(bt)] = bt
+ i += len(bt)
+ if self.__sendDatagramPacket(subnet, i, addr, port): subnet.send_all_ok = True
+
+ def __sendEmptyPacket(self, addr, port):
+ self.out_buf[4] = locator.CONF_SLAVES_INFO
+ for subnet in self.subnets:
+ if subnet.send_all_ok: continue
+ self.__sendDatagramPacket(subnet, 8, addr, port)
+
+ def __sendAll(self, addr, port, sl, tm):
+ for subnet in self.subnets: subnet.send_all_ok = False
+ for peer in self.peers.values(): self.__sendPeerInfo(peer, addr, port)
+ if addr is not None and sl is not None and sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm:
+ self.__sendSlavesInfo(addr, port, tm)
+ self.__sendEmptyPacket(addr, port)
+
+ def __sendSlavesRequest(self, subnet, addr, port):
+ self.out_buf[4] = locator.CONF_REQ_SLAVES
+ self.__sendDatagramPacket(subnet, 8, addr, port)
+
+ def __sendSlaveInfo(self, x, tm):
+ ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
+ if ttl <= 0: return
+ self.out_buf[4] = locator.CONF_SLAVES_INFO
+ for subnet in self.subnets:
+ if not subnet.contains(x.address): continue
+ i = 8
+ s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress())
+ bt = self.__getUTF8Bytes(s)
+ self.out_buf[i:i+len(bt)] = bt
+ i += len(bt)
+ self.out_buf[i] = 0
+ i += 1
+ for y in self.slaves:
+ if not subnet.contains(y.address): continue
+ if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm: continue
+ self.__sendDatagramPacket(subnet, i, y.address, y.port)
+
+ def __sendSlavesInfo(self, addr, port, tm):
+ self.out_buf[4] = locator.CONF_SLAVES_INFO
+ for subnet in self.subnets:
+ if not subnet.contains(addr): continue
+ i = 8
+ for x in self.slaves:
+ ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
+ if ttl <= 0: return
+ if x.port == port and x.address == addr: continue
+ if not subnet.address == self.loopback_addr:
+ if not subnet.contains(x.address): continue
subnet.send_all_ok = True
- str s = x.last_packet_time + ":" + x.port + ":" + x.address.getHostAddress()
- byte[] bt = getUTF8Bytes(s)
- if (i > 8 && i + bt.length >= PREF_PACKET_SIZE) {
- sendDatagramPacket(subnet, i, addr, port)
+ s = "%d:%s:%s" % (x.last_packet_time, x.port, x.address.getHostAddress())
+ bt = self.__getUTF8Bytes(s)
+ if i > 8 and i + len(bt) >= PREF_PACKET_SIZE:
+ self.__sendDatagramPacket(subnet, i, addr, port)
i = 8
- }
- System.arraycopy(bt, 0, out_buf, i, bt.length)
- i += bt.length
- out_buf[i++] = 0
- }
- if (i > 8) sendDatagramPacket(subnet, i, addr, port)
- }
- }
-
- private boolean isRemote(InetAddress address, int port) {
- if (port != socket.getLocalPort()) return True
- for (SubNet s : subnets) {
- if (s.address.equals(address)) return False
- }
+ self.out_buf[i:len(bt)] = bt
+ i += len(bt)
+ self.out_buf[i] = 0
+ i += 1
+ if i > 8: self.__sendDatagramPacket(subnet, i, addr, port)
+
+ def __isRemote(self, address, port):
+ if port != self.socket.getsockname()[1]: return True
+ for s in self.subnets:
+ if s.address == address: return False
return True
- }
-
- private void handleDatagramPacket(InputPacket p) {
- try {
- long time = System.currentTimeMillis()
- byte[] buf = p.getData()
- int len = p.getLength()
- if (len < 8) return
- if (buf[0] != 'T') return
- if (buf[1] != 'C') return
- if (buf[2] != 'F') return
- if (buf[3] != CONF_VERSION) return
- int remote_port = p.getPort()
- InetAddress remote_address = p.getAddress()
- if (isRemote(remote_address, remote_port)) {
- Slave sl = None
- if (remote_port != DISCOVEY_PORT) {
- sl = addSlave(remote_address, remote_port, time, time)
- }
- switch (buf[4]) {
- case CONF_PEER_INFO:
- handlePeerInfoPacket(p)
- break
- case CONF_REQ_INFO:
- handleReqInfoPacket(p, sl, time)
- break
- case CONF_SLAVES_INFO:
- handleSlavesInfoPacket(p, time)
- break
- case CONF_REQ_SLAVES:
- handleReqSlavesPacket(p, sl, time)
- break
- }
- for (SubNet subnet : subnets) {
- if (!subnet.contains(remote_address)) continue
- long delay = DATA_RETENTION_PERIOD / 3
- if (remote_port != DISCOVEY_PORT) delay = DATA_RETENTION_PERIOD / 32
- else if (!subnet.address.equals(remote_address)) delay = DATA_RETENTION_PERIOD / 2
- if (subnet.last_slaves_req_time + delay <= time) {
- sendSlavesRequest(subnet, remote_address, remote_port)
- subnet.last_slaves_req_time = time
- }
- if (subnet.address.equals(remote_address) && remote_port == DISCOVEY_PORT) {
- last_master_packet_time = time
- }
- }
- }
- }
- catch (Throwable x) {
- log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x)
- }
- }
-
- private void handlePeerInfoPacket(InputPacket p) {
- try {
- Map<str,str> map = parsePeerAtrributes(p.getData(), p.getLength())
- if (TRACE_DISCOVERY) traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p)
- str id = map.get(IPeer.ATTR_ID)
- if (id == None) throw new Exception("Invalid peer info: no ID")
- boolean ok = True
- str host = map.get(IPeer.ATTR_IP_HOST)
- if (host != None) {
+
+ def __handleDatagramPacket(self, p):
+ try:
+ tm = int(time.time())
+ buf = p.getData()
+ len = p.getLength()
+ if len < 8: return
+ if buf[0] != 'T': return
+ if buf[1] != 'C': return
+ if buf[2] != 'F': return
+ if buf[3] != locator.CONF_VERSION: return
+ remote_port = p.getPort()
+ remote_address = p.getAddress()
+ if self.__isRemote(remote_address, remote_port):
+ sl = None
+ if remote_port != DISCOVEY_PORT:
+ sl = self.__addSlave(remote_address, remote_port, time, time)
+ code = buf[4]
+ if code == locator.CONF_PEER_INFO:
+ self.__handlePeerInfoPacket(p)
+ elif code == locator.CONF_REQ_INFO:
+ self.__handleReqInfoPacket(p, sl, time)
+ elif code == locator.CONF_SLAVES_INFO:
+ self.__handleSlavesInfoPacket(p, time)
+ elif code == locator.CONF_REQ_SLAVES:
+ self.__handleReqSlavesPacket(p, sl, time)
+ for subnet in self.subnets:
+ if not subnet.contains(remote_address): continue
+ delay = locator.DATA_RETENTION_PERIOD / 3
+ if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32
+ elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2
+ if subnet.last_slaves_req_time + delay <= tm:
+ self.__sendSlavesRequest(subnet, remote_address, remote_port)
+ subnet.last_slaves_req_time = tm
+ if subnet.address == remote_address and remote_port == DISCOVEY_PORT:
+ self.last_master_packet_time = time
+ except exceptions.BaseException as x:
+ self.log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x)
+
+ def __handlePeerInfoPacket(self, p):
+ try:
+ map = self.__parsePeerAtrributes(p.getData(), p.getLength())
+ if __TRACE_DISCOVERY: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p)
+ id = map.get(peer.ATTR_ID)
+ if id is None: raise exceptions.RuntimeError("Invalid peer info: no ID")
+ ok = True
+ host = map.get(peer.ATTR_IP_HOST)
+ if host is not None:
ok = False
- InetAddress peer_addr = getInetAddress(host)
- if (peer_addr != None) {
- for (SubNet subnet : subnets) {
- if (subnet.contains(peer_addr)) {
+ peer_addr = self.__getInetAddress(host)
+ if peer_addr is not None:
+ for subnet in self.subnets:
+ if subnet.contains(peer_addr):
ok = True
break
- }
- }
- }
- }
- if (ok) {
- IPeer peer = peers.get(id)
- if (peer instanceof RemotePeer) {
- ((RemotePeer)peer).updateAttributes(map)
- }
- else if (peer == None) {
- new RemotePeer(map)
- }
- }
- }
- catch (Exception x) {
- log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x)
- }
- }
-
- private void handleReqInfoPacket(InputPacket p, Slave sl, long time) {
- if (TRACE_DISCOVERY) {
- traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p)
- }
- sendAll(p.getAddress(), p.getPort(), sl, time)
- }
-
- private void handleSlavesInfoPacket(InputPacket p, long time_now) {
- try {
- Map<str,str> trace_map = None # used for tracing only
- int slave_index = 0 # used for tracing only
- if (TRACE_DISCOVERY) {
- trace_map = new HashMap<str,str>(3)
- }
-
- str s = new str(p.getData(), 8, p.getLength() - 8, "UTF-8")
- int l = s.length()
- int i = 0
- while (i < l) {
- int time0 = i
- while (i < l&& s.charAt(i) != ':' && s.charAt(i) != 0) i++
- int time1 = i
- if (i < l && s.charAt(i) == ':') i++
- int port0 = i
- while (i < l&& s.charAt(i) != ':' && s.charAt(i) != 0) i++
- int port1 = i
- if (i < l && s.charAt(i) == ':') i++
- int host0 = i
- while (i < l && s.charAt(i) != 0) i++
- int host1 = i
- if (i < l && s.charAt(i) == 0) i++
- int port = Integer.parseInt(s.substring(port0, port1))
- str timestamp = s.substring(time0, time1)
- str host = s.substring(host0, host1)
- if (TRACE_DISCOVERY) {
- trace_map.put("slave[" + slave_index++ + ']', timestamp + ':' + port + ':' + host)
- }
- if (port != DISCOVEY_PORT) {
- InetAddress addr = getInetAddress(host)
- if (addr != None) {
- long delta = 10006030 # 30 minutes
- long time_val = timestamp.length() > 0 ? Long.parseLong(timestamp) : time_now
- if (time_val < 3600000) {
- """Time stamp is "time to live" in milliseconds"""
- time_val = time_now + time_val / 1000 - DATA_RETENTION_PERIOD
- }
- else if (time_val < time_now / 1000 + 50000000) {
- """Time stamp is in seconds"""
+ if ok:
+ _peer = self.peers.get(id)
+ if isinstance(_peer, peer.RemotePeer):
+ _peer.updateAttributes(map)
+ elif peer is None:
+ peer.RemotePeer(map)
+ except exceptions.BaseException as x:
+ self.log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
+
+ def __handleReqInfoPacket(self, p, sl, tm):
+ if __TRACE_DISCOVERY:
+ self.__traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p)
+ self.__sendAll(p.getAddress(), p.getPort(), sl, tm)
+
+ def __handleSlavesInfoPacket(self, p, time_now):
+ try:
+ trace_map = None # used for tracing only
+ slave_index = 0 # used for tracing only
+ if __TRACE_DISCOVERY:
+ trace_map = {}
+
+ s = p.getData()[8:p.getLength() - 8].decode("UTF-8")
+ l = len(s)
+ i = 0
+ while i < l:
+ time0 = i
+ while i < l and s[i] != ':' and s[i] != '\0': i += 1
+ time1 = i
+ if i < l and s[i] == ':': i += 1
+ port0 = i
+ while i < l and s[i] != ':' and s[i] != '\0': i += 1
+ port1 = i
+ if i < l and s[i] == ':': i += 1
+ host0 = i
+ while i < l and s[i] != '\0': i += 1
+ host1 = i
+ if i < l and s[i] == '\0': i += 1
+ port = int(s[port0:port1])
+ timestamp = s[time0:time1]
+ host = s[host0:host1]
+ if __TRACE_DISCOVERY:
+ trace_map.put("slave[%d]" % slave_index, '%s:%d:%s' % (timestamp, port, host))
+ slave_index += 1
+ if port != DISCOVEY_PORT:
+ addr = self.__getInetAddress(host)
+ if addr is not None:
+ delta = 10006030 # 30 minutes
+ if len(timestamp) > 0:
+ time_val = int(timestamp)
+ else:
+ time_val = time_now
+ if time_val < 3600000:
+ # Time stamp is "time to live" in milliseconds
+ time_val = time_now + time_val / 1000 - locator.DATA_RETENTION_PERIOD
+ elif time_val < time_now / 1000 + 50000000:
+ # Time stamp is in seconds
time_val= 1000
- }
- else {
- """Time stamp is in milliseconds"""
- }
- if (time_val < time_now - delta || time_val > time_now + delta) {
- SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
- str msg =
- "Invalid slave info timestamp: " + timestamp +
- " -> " + fmt.format(new Date(time_val))
- log("Invalid datagram packet received from " +
- p.getAddress() + "/" + p.getPort(),
- new Exception(msg))
- time_val = time_now - DATA_RETENTION_PERIOD / 2
- }
- addSlave(addr, port, time_val, time_now)
- }
- }
- }
- if (TRACE_DISCOVERY) {
- traceDiscoveryPacket(True, "CONF_SLAVES_INFO", trace_map, p)
- }
- }
- catch (Exception x) {
- log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x)
- }
- }
-
- private void handleReqSlavesPacket(InputPacket p, Slave sl, long time) {
- if (TRACE_DISCOVERY) {
- traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
- }
- if (sl != None) sl.last_req_slaves_time = time
- sendSlavesInfo(p.getAddress(), p.getPort(), time)
- }
-
- """----------------------------------------------------------------------------------"""
-
- public static LocatorService getLocator() {
- return locator
- }
-
- public str getName() {
- return NAME
- }
-
- public Map<str,IPeer> getPeers() {
- assert Protocol.isDispatchThread()
- return peers
- }
-
- public IToken redirect(str peer_id, DoneRedirect done) {
- throw new Error("Channel redirect cannot be done on local peer")
- }
-
- public IToken redirect(Map<str,str> peer, DoneRedirect done) {
- throw new Error("Channel redirect cannot be done on local peer")
- }
-
- public IToken sync(DoneSync done) {
- throw new Error("Channel sync cannot be done on local peer")
- }
-
- public void addListener(LocatorListener listener) {
- assert listener != None
- assert Protocol.isDispatchThread()
- listeners.add(listener)
- }
-
- public void removeListener(LocatorListener listener) {
- assert Protocol.isDispatchThread()
- listeners.remove(listener)
- }
-
- """
- Log that a TCF Discovery packet has be sent or received. The trace is
- sent to stdout. This should be called only if the tracing has been turned
- on via java property definitions.
-
- @param received
- True if the packet was sent, otherwise it was received
- @param type
- a string specifying the type of packet, e.g., "CONF_PEER_INFO"
- @param attrs
- a set of attributes relevant to the type of packet (typically
- a peer's attributes)
- @param addr
- the network address the packet is being sent to
- @param port
- the port the packet is being sent to
- """
- private static void traceDiscoveryPacket(boolean received, str type, Map<str,str> attrs, InetAddress addr, int port) {
- assert TRACE_DISCOVERY
- StringBuilder str = new StringBuilder(type + (received ? " received from " : " sent to ") + addr + "/" + port)
- if (attrs != None) {
- Iterator<Entry<str, str>> iter = attrs.entrySet().iterator()
- while (iter.hasNext()) {
- Entry<str, str> entry = iter.next()
- str.append("\n\t" + entry.getKey() + '=' + entry.getValue())
- }
- }
- logging.trace(str.toString())
- }
-
- """
- Convenience variant that takes a DatagramPacket for specifying
- the target address and port.
- """
- private static void traceDiscoveryPacket(boolean received, str type, Map<str,str> attrs, InputPacket packet) {
- traceDiscoveryPacket(received, type, attrs, packet.getAddress(), packet.getPort())
- }
-}
-
-
-static {
- ServiceManager.addServiceProvider(new IServiceProvider() {
-
- public IService[] getLocalService(final IChannel channel) {
- channel.addCommandServer(locator, new IChannel.ICommandServer() {
- public void command(IToken token, str name, byte[] data) {
- locator.command((AbstractChannel)channel, token, name, data)
- }
- })
- return new IService[]{ locator }
- }
-
- public IService getServiceProxy(IChannel channel, str service_name) {
- return None
- }
- })
-}
+ else:
+ # Time stamp is in milliseconds
+ pass
+ if time_val < time_now - delta or time_val > time_now + delta:
+ msg = "Invalid slave info timestamp: %s -> %s" % (
+ timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val/1000.)))
+ self.log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), exceptions.Exception(msg))
+ time_val = time_now - locator.DATA_RETENTION_PERIOD / 2
+ self.__addSlave(addr, port, time_val, time_now)
+ if __TRACE_DISCOVERY:
+ self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", trace_map, p)
+ except exceptions.BaseException as x:
+ self.log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
+
+ def __handleReqSlavesPacket(self, p, sl, tm):
+ if __TRACE_DISCOVERY:
+ self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
+ if sl is not None: sl.last_req_slaves_time = tm
+ self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm)
+
+ @classmethod
+ def getLocator(cls):
+ return cls.locator
+
+ def getPeers(self):
+ assert protocol.isDispatchThread()
+ return self.peers
+
+ def redirect(self, peer, done):
+ raise RuntimeError("Channel redirect cannot be done on local peer")
+
+ def sync(self, done):
+ raise RuntimeError("Channel sync cannot be done on local peer")
+ def addListener(self, listener):
+ assert listener is not None
+ assert protocol.isDispatchThread()
+ self.listeners.add(listener)
+
+ def removeListener(self, listener):
+ assert protocol.isDispatchThread()
+ self.listeners.remove(listener)
+
+ @classmethod
+ def __traceDiscoveryPacket(cls, received, type, attrs, addr, port=None):
+ """
+ Log that a TCF Discovery packet has be sent or received. The trace is
+ sent to stdout. This should be called only if the tracing has been turned
+ on.
+
+ @param received
+ True if the packet was sent, otherwise it was received
+ @param type
+ a string specifying the type of packet, e.g., "CONF_PEER_INFO"
+ @param attrs
+ a set of attributes relevant to the type of packet (typically
+ a peer's attributes)
+ @param addr
+ the network address the packet is being sent to
+ @param port
+ the port the packet is being sent to
+ """
+ assert __TRACE_DISCOVERY
+ if port is None:
+ # addr is a InputPacket
+ addr = addr.getAddress()
+ port = addr.getPort()
+ str = cStringIO.StringIO()
+ str.write(type)
+ str.write((" sent to ", " received from ")[received])
+ str.write("%s/%s" % (addr, port))
+ if attrs is not None:
+ for key, value in attrs.items():
+ str.write("\n\t%s=%s" % (key, value))
+ logging.trace(str.getvalue())
+
+class LocatorServiceProvider(services.ServiceProvider):
+ def getLocalService(self, _channel):
+ class CommandServer(channel.CommandServer):
+ def command(self, token, name, data):
+ LocatorService.locator.command(channel, token, name, data)
+ _channel.addCommandServer(LocatorService.locator, CommandServer())
+ return (LocatorService.locator,)
+
+services.addServiceProvider(LocatorServiceProvider())

Back to the top