Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleherbau2011-06-08 02:23:07 -0400
committeraleherbau2011-06-08 02:23:07 -0400
commit4f2de8ba162fb69486e22cc6abbf2d8a5e3d4c35 (patch)
tree7a22fcc8d1eed6444b586b522979a6ca9dde7334 /python/src
parentcad9f08c15c6ca56b74d98b6e291cfafecbd5815 (diff)
downloadorg.eclipse.tcf-4f2de8ba162fb69486e22cc6abbf2d8a5e3d4c35.tar.gz
org.eclipse.tcf-4f2de8ba162fb69486e22cc6abbf2d8a5e3d4c35.tar.xz
org.eclipse.tcf-4f2de8ba162fb69486e22cc6abbf2d8a5e3d4c35.zip
TCF Python: Added support for CONF_PEERS_REMOVED
Diffstat (limited to 'python/src')
-rw-r--r--python/src/tcf/services/local/LocatorService.py119
-rw-r--r--python/src/tcf/services/locator.py1
2 files changed, 75 insertions, 45 deletions
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index ebd2b4813..e2c3bf0bc 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -432,7 +432,8 @@ class LocatorService(locator.LocatorService):
"CONF_REQ_INFO",
"CONF_PEER_INFO",
"CONF_REQ_SLAVES",
- "CONF_SLAVES_INFO"
+ "CONF_SLAVES_INFO",
+ "CONF_PEERS_REMOVED"
]
def __sendDatagramPacket(self, subnet, size, addr, port):
@@ -450,6 +451,10 @@ class LocatorService(locator.LocatorService):
map = None
if self.out_buf[4] == locator.CONF_PEER_INFO:
map = self.__parsePeerAttributes(self.out_buf, 8)
+ elif self.out_buf[4] == locator.CONF_SLAVES_INFO:
+ map = self.__parseIDs(self.out_buf, size)
+ elif self.out_buf[4] == locator.CONF_PEERS_REMOVED:
+ map = self.__parseIDs(self.out_buf, size)
self.__traceDiscoveryPacket(False, self.packetTypes[self.out_buf[4]], map, addr, port)
except Exception as x:
self._log("Cannot send datagram packet to %s" % addr, x)
@@ -458,12 +463,10 @@ class LocatorService(locator.LocatorService):
def __parsePeerAttributes(self, data, size):
"""
- Parse peer attributes in CONF_INFO_PEER packet data
+ Parse peer attributes in CONF_PEER_INFO packet data
- @param data
- the packet section that contain the peer attributes
- @param size
- the number of bytes in [data] that contain peer attributes
+ @param data - the packet data
+ @param size - the packet size
@return a map containing the attributes
"""
map = {}
@@ -484,6 +487,29 @@ class LocatorService(locator.LocatorService):
map[key] = val
return map
+ def __parseIDs(self, data, size):
+ """
+ Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet data.
+
+ @param data - the packet data
+ @param size - the packet size
+ @return a map containing the IDs
+ """
+ cnt = 0
+ map = {}
+ s = data[8:size - 8].decode("UTF-8")
+ l = len(s)
+ i = 0
+ while i < l:
+ i0 = i
+ while i < l and s[i] != '\0': i += 1
+ if i > i0:
+ id = s[i0:i]
+ map[str(cnt)] = id
+ cnt += 1
+ while i < l and s[i] == '\0': i += 1
+ return map
+
def __sendPeersRequest(self, addr, port):
self.out_buf[4] = locator.CONF_REQ_INFO
for subnet in self.subnets:
@@ -593,28 +619,31 @@ class LocatorService(locator.LocatorService):
remote_port = p.getPort()
remote_address = p.getAddress()
if self.__isRemote(remote_address, remote_port):
- sl = None
- if remote_port != DISCOVEY_PORT:
- sl = self.__addSlave(remote_address, remote_port, tm, tm)
- code = ord(buf[4])
- if code == locator.CONF_PEER_INFO:
- self.__handlePeerInfoPacket(p)
- elif code == locator.CONF_REQ_INFO:
- self.__handleReqInfoPacket(p, sl, tm)
- elif code == locator.CONF_SLAVES_INFO:
- self.__handleSlavesInfoPacket(p, tm)
- elif code == locator.CONF_REQ_SLAVES:
- self.__handleReqSlavesPacket(p, sl, tm)
- for subnet in self.subnets:
- if not subnet.contains(remote_address): continue
- delay = locator.DATA_RETENTION_PERIOD / 3
- if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32
- elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2
- if subnet.last_slaves_req_time + delay <= tm:
- self.__sendSlavesRequest(subnet, remote_address, remote_port)
- subnet.last_slaves_req_time = tm
- if subnet.address == remote_address and remote_port == DISCOVEY_PORT:
- self.last_master_packet_time = tm
+ if buf[4] == locator.CONF_PEERS_REMOVED:
+ self.__handlePeerRemovedPacket(p)
+ else:
+ sl = None
+ if remote_port != DISCOVEY_PORT:
+ sl = self.__addSlave(remote_address, remote_port, tm, tm)
+ code = ord(buf[4])
+ if code == locator.CONF_PEER_INFO:
+ self.__handlePeerInfoPacket(p)
+ elif code == locator.CONF_REQ_INFO:
+ self.__handleReqInfoPacket(p, sl, tm)
+ elif code == locator.CONF_SLAVES_INFO:
+ self.__handleSlavesInfoPacket(p, tm)
+ elif code == locator.CONF_REQ_SLAVES:
+ self.__handleReqSlavesPacket(p, sl, tm)
+ for subnet in self.subnets:
+ if not subnet.contains(remote_address): continue
+ delay = locator.DATA_RETENTION_PERIOD / 3
+ if remote_port != DISCOVEY_PORT: delay = locator.DATA_RETENTION_PERIOD / 32
+ elif subnet.address != remote_address: delay = locator.DATA_RETENTION_PERIOD / 2
+ if subnet.last_slaves_req_time + delay <= tm:
+ self.__sendSlavesRequest(subnet, remote_address, remote_port)
+ subnet.last_slaves_req_time = tm
+ if subnet.address == remote_address and remote_port == DISCOVEY_PORT:
+ self.last_master_packet_time = tm
except Exception as x:
self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
@@ -650,15 +679,11 @@ class LocatorService(locator.LocatorService):
def __handleSlavesInfoPacket(self, p, time_now):
try:
- trace_map = None # used for tracing only
- slave_index = 0 # used for tracing only
- if __TRACE_DISCOVERY__:
- trace_map = {}
-
- s = p.getData()[8:p.getLength() - 8].decode("UTF-8")
- l = len(s)
- i = 0
- while i < l:
+ map = self.__parseIDs(p.getData(), p.getLength())
+ if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", map, p)
+ for s in map.values():
+ i = 0
+ l = len(s)
time0 = i
while i < l and s[i] != ':' and s[i] != '\0': i += 1
time1 = i
@@ -670,13 +695,9 @@ class LocatorService(locator.LocatorService):
host0 = i
while i < l and s[i] != '\0': i += 1
host1 = i
- if i < l and s[i] == '\0': i += 1
port = int(s[port0:port1])
timestamp = s[time0:time1]
host = s[host0:host1]
- if __TRACE_DISCOVERY__:
- trace_map["slave[%d]" % slave_index] = "%s:%d:%s" % (timestamp, port, host)
- slave_index += 1
if port != DISCOVEY_PORT:
addr = self.__getInetAddress(host)
if addr is not None:
@@ -700,17 +721,25 @@ class LocatorService(locator.LocatorService):
self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), Exception(msg))
time_val = time_now - locator.DATA_RETENTION_PERIOD / 2
self.__addSlave(addr, port, time_val, time_now)
- if __TRACE_DISCOVERY__:
- self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", trace_map, p)
except Exception as x:
self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
def __handleReqSlavesPacket(self, p, sl, tm):
- if __TRACE_DISCOVERY__:
- self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
+ if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
if sl is not None: sl.last_req_slaves_time = tm
self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm)
+ def __handlePeerRemovedPacket(self, p):
+ try:
+ map = self.__parseIDs(p.getData(), p.getLength())
+ if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", map, p)
+ for id in map.values():
+ _peer = self.peers.get(id)
+ if isinstance(_peer, peer.RemotePeer): _peer.dispose()
+ except Exception as x:
+ self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
+
+
@classmethod
def getLocator(cls):
return cls.locator
diff --git a/python/src/tcf/services/locator.py b/python/src/tcf/services/locator.py
index 8ffe42be2..10199a140 100644
--- a/python/src/tcf/services/locator.py
+++ b/python/src/tcf/services/locator.py
@@ -33,6 +33,7 @@ CONF_REQ_INFO = 1
CONF_PEER_INFO = 2
CONF_REQ_SLAVES = 3
CONF_SLAVES_INFO = 4
+CONF_PEERS_REMOVED = 5
NAME = "Locator"

Back to the top