diff options
Diffstat (limited to 'python/src/tcf/peer.py')
-rw-r--r-- | python/src/tcf/peer.py | 548 |
1 files changed, 274 insertions, 274 deletions
diff --git a/python/src/tcf/peer.py b/python/src/tcf/peer.py index f93092001..85d595d96 100644 --- a/python/src/tcf/peer.py +++ b/python/src/tcf/peer.py @@ -1,274 +1,274 @@ -# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
-# * All rights reserved. This program and the accompanying materials
-# * are made available under the terms of the Eclipse Public License v1.0
-# * which accompanies this distribution, and is available at
-# * http://www.eclipse.org/legal/epl-v10.html
-# *
-# * Contributors:
-# * Wind River Systems - initial API and implementation
-# *******************************************************************************
-
-"""
-Both hosts and targets are represented by objects
-implementing IPeer interface. A peer can act as host or
-target depending on services it implements.
-List of currently known peers can be retrieved by
-calling Locator.getPeers()
-
-A TCF agent houses one or more service managers. A service manager has a one or more
-services to expose to the world. The service manager creates one or more peers
-to represent itself, one for every access path the agent is
-reachable by. For example, in agents accessible via TCP/IP, the
-service manger would create a peer for every subnet it wants to participate in.
-All peers of particular service manager represent identical sets of services.
-"""
-
-import os, time, json
-from tcf import protocol, transport, services
-from tcf.services import locator
-
-# Peer unique ID
-ATTR_ID = "ID"
-
-# Unique ID of service manager that is represented by this peer
-ATTR_SERVICE_MANAGER_ID = "ServiceManagerID"
-
-# Agent unique ID
-ATTR_AGENT_ID = "AgentID"
-
-# Peer name
-ATTR_NAME = "Name"
-
-# Name of the peer operating system
-ATTR_OS_NAME = "OSName"
-
-# Transport name, for example TCP, SSL
-ATTR_TRANSPORT_NAME = "TransportName"
-
-# If present, indicates that the peer can forward traffic to other peers
-ATTR_PROXY = "Proxy"
-
-# Host DNS name or IP address
-ATTR_IP_HOST = "Host"
-
-# Optional list of host aliases
-ATTR_IP_ALIASES = "Aliases"
-
-# Optional list of host addresses
-ATTR_IP_ADDRESSES = "Addresses"
-
-# IP port number, must be decimal number
-ATTR_IP_PORT = "Port"
-
-
-class Peer(object):
- def __init__(self, attrs):
- self.attrs = attrs
- def getAttributes(self):
- """@return map of peer attributes"""
- return self.attrs
-
- def getID(self):
- """@return peer unique ID, same as getAttributes().get(ATTR_ID)"""
- return self.attrs.get(ATTR_ID)
-
- def getServiceManagerID(self):
- """@return service manager unique ID, same as getAttributes().get(ATTR_SERVICE_MANAGER_ID)"""
- assert protocol.isDispatchThread()
- return self.attrs.get(ATTR_SERVICE_MANAGER_ID)
-
- def getAgentID(self):
- """@return agent unique ID, same as getAttributes().get(ATTR_AGENT_ID)"""
- assert protocol.isDispatchThread()
- return self.attrs.get(ATTR_AGENT_ID)
-
- def getName(self):
- """@return peer name, same as getAttributes().get(ATTR_NAME)"""
- return self.attrs.get(ATTR_NAME)
-
- def getOSName(self):
- """@return agent OS name, same as getAttributes().get(ATTR_OS_NAME)"""
- return self.attrs.get(ATTR_OS_NAME)
-
- def getTransportName(self):
- """@return transport name, same as getAttributes().get(ATTR_TRANSPORT_NAME)"""
- return self.attrs.get(ATTR_TRANSPORT_NAME)
-
- def openChannel(self):
- """Open channel to communicate with this peer.
- Note: the channel is not fully open yet when this method returns.
- Its state is channel.STATE_OPENING.
- Protocol.ChannelOpenListener and IChannel.IChannelListener listeners will be called when
- the channel will change state to open or closed.
- Clients are supposed to register IChannel.IChannelListener right after calling openChannel(), or,
- at least, in same dispatch cycle. For example:
- channel = peer.openChannel()
- channel.addChannelListener(...)
- """
- raise RuntimeError("Abstract method")
-
-
-class TransientPeer(Peer):
- """
- Transient implementation of IPeer interface.
- Objects of this class are not tracked by Locator service.
- See AbstractPeer for IPeer objects that should go into the Locator table.
- """
- def __init__(self, attrs):
- self.rw_attrs = {}
- self.rw_attrs.update(attrs)
- # TODO readonly map
- ro_attrs = {}
- ro_attrs.update(self.rw_attrs)
- super(TransientPeer, self).__init__(ro_attrs)
-
- def openChannel(self):
- return transport.openChannel(self)
-
-class LocalPeer(TransientPeer):
- """
- LocalPeer object represents local end-point of TCF communication channel.
- There should be exactly one such object in a TCF agent.
- The object can be used to open a loop-back communication channel that allows
- the agent to access its own services same way as remote services.
- Note that "local" here is relative to the agent, and not same as in "local host".
- """
- def __init__(self):
- super(LocalPeer, self).__init__(self.createAttributes())
-
- def createAttributes(self):
- attrs = {
- ATTR_ID : "TCFLocal",
- ATTR_SERVICE_MANAGER_ID : services.getServiceManagerID(),
- ATTR_AGENT_ID : protocol.getAgentID(),
- ATTR_NAME : "Local Peer",
- ATTR_OS_NAME : os.name,
- ATTR_TRANSPORT_NAME : "Loop"
- }
- return attrs;
-
-class AbstractPeer(TransientPeer):
- """
- Abstract implementation of IPeer interface.
- Objects of this class are stored in Locator service peer table.
- The class implements sending notification events to Locator listeners.
- See TransientPeer for IPeer objects that are not stored in the Locator table.
- """
-
- last_heart_beat_time = 0
-
- def __init__(self, attrs):
- super(AbstractPeer, self).__init__(attrs)
- assert protocol.isDispatchThread()
- id = self.getID()
- assert id
- peers = protocol.getLocator().getPeers()
- if isinstance(peers.get(id), RemotePeer):
- peers.get(id).dispose()
- assert id not in peers
- peers[id] = self
- self.sendPeerAddedEvent()
-
- def dispose(self):
- assert protocol.isDispatchThread()
- id = self.getID()
- assert id
- peers = protocol.getLocator().getPeers()
- assert peers.get(id) == self
- del peers[id]
- self.sendPeerRemovedEvent()
-
- def onChannelTerminated(self):
- # A channel to this peer was terminated:
- # not delaying next heart beat helps client to recover much faster.
- self.last_heart_beat_time = 0
-
- def updateAttributes(self, attrs):
- equ = True
- assert attrs.get(ATTR_ID) == self.rw_attrs.get(ATTR_ID)
- for key in self.rw_attrs.keys():
- if self.rw_attrs.get(key) != attrs.get(key):
- equ = False
- break
- for key in attrs.keys():
- if attrs.get(key) != self.rw_attrs.get(key):
- equ = False
- break
- timeVal = int(time.time() * 1000)
- if not equ:
- self.rw_attrs.clear()
- self.rw_attrs.update(attrs)
- for l in protocol.getLocator().getListeners():
- try:
- l.peerChanged(self)
- except Exception as x:
- protocol.log("Unhandled exception in Locator listener", x)
- try:
- args = [self.rw_attrs]
- protocol.sendEvent(locator.NAME, "peerChanged", json.dumps(args))
- except IOError as x:
- protocol.log("Locator: failed to send 'peerChanged' event", x)
- self.last_heart_beat_time = timeVal
- elif self.last_heart_beat_time + locator.DATA_RETENTION_PERIOD / 4 < timeVal:
- for l in protocol.getLocator().getListeners():
- try:
- l.peerHeartBeat(attrs.get(ATTR_ID))
- except Exception as x:
- protocol.log("Unhandled exception in Locator listener", x)
- try:
- args = [self.rw_attrs.get(ATTR_ID)]
- protocol.sendEvent(locator.NAME, "peerHeartBeat", json.dumps(args))
- except IOError as x:
- protocol.log("Locator: failed to send 'peerHeartBeat' event", x)
- self.last_heart_beat_time = timeVal
-
- def sendPeerAddedEvent(self):
- for l in protocol.getLocator().getListeners():
- try:
- l.peerAdded(self)
- except Exception as x:
- protocol.log("Unhandled exception in Locator listener", x)
- try:
- args = [self.rw_attrs]
- protocol.sendEvent(locator.NAME, "peerAdded", json.dumps(args))
- except IOError as x:
- protocol.log("Locator: failed to send 'peerAdded' event", x)
- self.last_heart_beat_time = int(time.time() * 1000)
-
- def sendPeerRemovedEvent(self):
- for l in protocol.getLocator().getListeners():
- try:
- l.peerRemoved(self.rw_attrs.get(ATTR_ID))
- except Exception as x:
- protocol.log("Unhandled exception in Locator listener", x)
- try:
- args = [self.rw_attrs.get(ATTR_ID)]
- protocol.sendEvent(locator.NAME, "peerRemoved", json.dumps(args))
- except IOError as x:
- protocol.log("Locator: failed to send 'peerRemoved' event", x)
-
-
-class RemotePeer(AbstractPeer):
- """
- RemotePeer objects represent TCF agents that Locator service discovered on local network.
- This includes both local host agents and remote host agents.
- Note that "remote peer" means any peer accessible over network,
- it does not imply the agent is running on a "remote host".
- If an agent binds multiple network interfaces or multiple ports, it can be represented by
- multiple RemotePeer objects - one per each network address/port combination.
- RemotePeer objects life cycle is managed by Locator service.
- """
-
- last_update_time = 0
-
- def __init__(self, attrs):
- super(RemotePeer, self).__init__(attrs)
- self.last_update_time = int(time.time() * 1000)
-
- def updateAttributes(self, attrs):
- super(RemotePeer, self).updateAttributes(attrs)
- self.last_update_time = int(time.time() * 1000)
-
- def getLastUpdateTime(self):
- return self.last_update_time
+# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +""" +Both hosts and targets are represented by objects +implementing IPeer interface. A peer can act as host or +target depending on services it implements. +List of currently known peers can be retrieved by +calling Locator.getPeers() + +A TCF agent houses one or more service managers. A service manager has a one or more +services to expose to the world. The service manager creates one or more peers +to represent itself, one for every access path the agent is +reachable by. For example, in agents accessible via TCP/IP, the +service manger would create a peer for every subnet it wants to participate in. +All peers of particular service manager represent identical sets of services. +""" + +import os, time, json +from tcf import protocol, transport, services +from tcf.services import locator + +# Peer unique ID +ATTR_ID = "ID" + +# Unique ID of service manager that is represented by this peer +ATTR_SERVICE_MANAGER_ID = "ServiceManagerID" + +# Agent unique ID +ATTR_AGENT_ID = "AgentID" + +# Peer name +ATTR_NAME = "Name" + +# Name of the peer operating system +ATTR_OS_NAME = "OSName" + +# Transport name, for example TCP, SSL +ATTR_TRANSPORT_NAME = "TransportName" + +# If present, indicates that the peer can forward traffic to other peers +ATTR_PROXY = "Proxy" + +# Host DNS name or IP address +ATTR_IP_HOST = "Host" + +# Optional list of host aliases +ATTR_IP_ALIASES = "Aliases" + +# Optional list of host addresses +ATTR_IP_ADDRESSES = "Addresses" + +# IP port number, must be decimal number +ATTR_IP_PORT = "Port" + + +class Peer(object): + def __init__(self, attrs): + self.attrs = attrs + def getAttributes(self): + """@return map of peer attributes""" + return self.attrs + + def getID(self): + """@return peer unique ID, same as getAttributes().get(ATTR_ID)""" + return self.attrs.get(ATTR_ID) + + def getServiceManagerID(self): + """@return service manager unique ID, same as getAttributes().get(ATTR_SERVICE_MANAGER_ID)""" + assert protocol.isDispatchThread() + return self.attrs.get(ATTR_SERVICE_MANAGER_ID) + + def getAgentID(self): + """@return agent unique ID, same as getAttributes().get(ATTR_AGENT_ID)""" + assert protocol.isDispatchThread() + return self.attrs.get(ATTR_AGENT_ID) + + def getName(self): + """@return peer name, same as getAttributes().get(ATTR_NAME)""" + return self.attrs.get(ATTR_NAME) + + def getOSName(self): + """@return agent OS name, same as getAttributes().get(ATTR_OS_NAME)""" + return self.attrs.get(ATTR_OS_NAME) + + def getTransportName(self): + """@return transport name, same as getAttributes().get(ATTR_TRANSPORT_NAME)""" + return self.attrs.get(ATTR_TRANSPORT_NAME) + + def openChannel(self): + """Open channel to communicate with this peer. + Note: the channel is not fully open yet when this method returns. + Its state is channel.STATE_OPENING. + Protocol.ChannelOpenListener and IChannel.IChannelListener listeners will be called when + the channel will change state to open or closed. + Clients are supposed to register IChannel.IChannelListener right after calling openChannel(), or, + at least, in same dispatch cycle. For example: + channel = peer.openChannel() + channel.addChannelListener(...) + """ + raise RuntimeError("Abstract method") + + +class TransientPeer(Peer): + """ + Transient implementation of IPeer interface. + Objects of this class are not tracked by Locator service. + See AbstractPeer for IPeer objects that should go into the Locator table. + """ + def __init__(self, attrs): + self.rw_attrs = {} + self.rw_attrs.update(attrs) + # TODO readonly map + ro_attrs = {} + ro_attrs.update(self.rw_attrs) + super(TransientPeer, self).__init__(ro_attrs) + + def openChannel(self): + return transport.openChannel(self) + +class LocalPeer(TransientPeer): + """ + LocalPeer object represents local end-point of TCF communication channel. + There should be exactly one such object in a TCF agent. + The object can be used to open a loop-back communication channel that allows + the agent to access its own services same way as remote services. + Note that "local" here is relative to the agent, and not same as in "local host". + """ + def __init__(self): + super(LocalPeer, self).__init__(self.createAttributes()) + + def createAttributes(self): + attrs = { + ATTR_ID : "TCFLocal", + ATTR_SERVICE_MANAGER_ID : services.getServiceManagerID(), + ATTR_AGENT_ID : protocol.getAgentID(), + ATTR_NAME : "Local Peer", + ATTR_OS_NAME : os.name, + ATTR_TRANSPORT_NAME : "Loop" + } + return attrs; + +class AbstractPeer(TransientPeer): + """ + Abstract implementation of IPeer interface. + Objects of this class are stored in Locator service peer table. + The class implements sending notification events to Locator listeners. + See TransientPeer for IPeer objects that are not stored in the Locator table. + """ + + last_heart_beat_time = 0 + + def __init__(self, attrs): + super(AbstractPeer, self).__init__(attrs) + assert protocol.isDispatchThread() + id = self.getID() + assert id + peers = protocol.getLocator().getPeers() + if isinstance(peers.get(id), RemotePeer): + peers.get(id).dispose() + assert id not in peers + peers[id] = self + self.sendPeerAddedEvent() + + def dispose(self): + assert protocol.isDispatchThread() + id = self.getID() + assert id + peers = protocol.getLocator().getPeers() + assert peers.get(id) == self + del peers[id] + self.sendPeerRemovedEvent() + + def onChannelTerminated(self): + # A channel to this peer was terminated: + # not delaying next heart beat helps client to recover much faster. + self.last_heart_beat_time = 0 + + def updateAttributes(self, attrs): + equ = True + assert attrs.get(ATTR_ID) == self.rw_attrs.get(ATTR_ID) + for key in self.rw_attrs.keys(): + if self.rw_attrs.get(key) != attrs.get(key): + equ = False + break + for key in attrs.keys(): + if attrs.get(key) != self.rw_attrs.get(key): + equ = False + break + timeVal = int(time.time() * 1000) + if not equ: + self.rw_attrs.clear() + self.rw_attrs.update(attrs) + for l in protocol.getLocator().getListeners(): + try: + l.peerChanged(self) + except Exception as x: + protocol.log("Unhandled exception in Locator listener", x) + try: + args = [self.rw_attrs] + protocol.sendEvent(locator.NAME, "peerChanged", json.dumps(args)) + except IOError as x: + protocol.log("Locator: failed to send 'peerChanged' event", x) + self.last_heart_beat_time = timeVal + elif self.last_heart_beat_time + locator.DATA_RETENTION_PERIOD / 4 < timeVal: + for l in protocol.getLocator().getListeners(): + try: + l.peerHeartBeat(attrs.get(ATTR_ID)) + except Exception as x: + protocol.log("Unhandled exception in Locator listener", x) + try: + args = [self.rw_attrs.get(ATTR_ID)] + protocol.sendEvent(locator.NAME, "peerHeartBeat", json.dumps(args)) + except IOError as x: + protocol.log("Locator: failed to send 'peerHeartBeat' event", x) + self.last_heart_beat_time = timeVal + + def sendPeerAddedEvent(self): + for l in protocol.getLocator().getListeners(): + try: + l.peerAdded(self) + except Exception as x: + protocol.log("Unhandled exception in Locator listener", x) + try: + args = [self.rw_attrs] + protocol.sendEvent(locator.NAME, "peerAdded", json.dumps(args)) + except IOError as x: + protocol.log("Locator: failed to send 'peerAdded' event", x) + self.last_heart_beat_time = int(time.time() * 1000) + + def sendPeerRemovedEvent(self): + for l in protocol.getLocator().getListeners(): + try: + l.peerRemoved(self.rw_attrs.get(ATTR_ID)) + except Exception as x: + protocol.log("Unhandled exception in Locator listener", x) + try: + args = [self.rw_attrs.get(ATTR_ID)] + protocol.sendEvent(locator.NAME, "peerRemoved", json.dumps(args)) + except IOError as x: + protocol.log("Locator: failed to send 'peerRemoved' event", x) + + +class RemotePeer(AbstractPeer): + """ + RemotePeer objects represent TCF agents that Locator service discovered on local network. + This includes both local host agents and remote host agents. + Note that "remote peer" means any peer accessible over network, + it does not imply the agent is running on a "remote host". + If an agent binds multiple network interfaces or multiple ports, it can be represented by + multiple RemotePeer objects - one per each network address/port combination. + RemotePeer objects life cycle is managed by Locator service. + """ + + last_update_time = 0 + + def __init__(self, attrs): + super(RemotePeer, self).__init__(attrs) + self.last_update_time = int(time.time() * 1000) + + def updateAttributes(self, attrs): + super(RemotePeer, self).updateAttributes(attrs) + self.last_update_time = int(time.time() * 1000) + + def getLastUpdateTime(self): + return self.last_update_time |