Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnton Leherbauer2012-05-31 07:45:17 -0400
committerAnton Leherbauer2012-05-31 07:51:03 -0400
commitdbde99db2120a2ed07c1edac67d9a382dfa67b22 (patch)
treebb3191bbf479955ea8ef059d0cf83629bbd64a5f
parentfa416b0fd93637ee5e55e7bb00b0f2947f44cc4a (diff)
downloadorg.eclipse.tcf-dbde99db2120a2ed07c1edac67d9a382dfa67b22.tar.gz
org.eclipse.tcf-dbde99db2120a2ed07c1edac67d9a382dfa67b22.tar.xz
org.eclipse.tcf-dbde99db2120a2ed07c1edac67d9a382dfa67b22.zip
Fix locator service updating peer attributes.
-rw-r--r--python/src/tcf/peer.py113
-rw-r--r--python/src/tcf/services/local/LocatorService.py2
-rw-r--r--python/src/tcf/util/sync.py89
3 files changed, 134 insertions, 70 deletions
diff --git a/python/src/tcf/peer.py b/python/src/tcf/peer.py
index 85d595d96..b96cabd02 100644
--- a/python/src/tcf/peer.py
+++ b/python/src/tcf/peer.py
@@ -1,5 +1,5 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# *****************************************************************************
+# * Copyright (c) 2011, 2012 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -7,7 +7,7 @@
# *
# * Contributors:
# * Wind River Systems - initial API and implementation
-# *******************************************************************************
+# *****************************************************************************
"""
Both hosts and targets are represented by objects
@@ -16,16 +16,17 @@ target depending on services it implements.
List of currently known peers can be retrieved by
calling Locator.getPeers()
-A TCF agent houses one or more service managers. A service manager has a one or more
-services to expose to the world. The service manager creates one or more peers
-to represent itself, one for every access path the agent is
+A TCF agent houses one or more service managers. A service manager has a one or
+more services to expose to the world. The service manager creates one or more
+peers to represent itself, one for every access path the agent is
reachable by. For example, in agents accessible via TCP/IP, the
service manger would create a peer for every subnet it wants to participate in.
All peers of particular service manager represent identical sets of services.
"""
-import os, time, json
-from tcf import protocol, transport, services
+import os
+import time
+from tcf import protocol, transport, services, channel
from tcf.services import locator
# Peer unique ID
@@ -65,6 +66,7 @@ ATTR_IP_PORT = "Port"
class Peer(object):
def __init__(self, attrs):
self.attrs = attrs
+
def getAttributes(self):
"""@return map of peer attributes"""
return self.attrs
@@ -74,12 +76,14 @@ class Peer(object):
return self.attrs.get(ATTR_ID)
def getServiceManagerID(self):
- """@return service manager unique ID, same as getAttributes().get(ATTR_SERVICE_MANAGER_ID)"""
+ """@return service manager unique ID, same as
+ getAttributes().get(ATTR_SERVICE_MANAGER_ID)"""
assert protocol.isDispatchThread()
return self.attrs.get(ATTR_SERVICE_MANAGER_ID)
def getAgentID(self):
- """@return agent unique ID, same as getAttributes().get(ATTR_AGENT_ID)"""
+ """@return agent unique ID, same as
+ getAttributes().get(ATTR_AGENT_ID)"""
assert protocol.isDispatchThread()
return self.attrs.get(ATTR_AGENT_ID)
@@ -92,17 +96,19 @@ class Peer(object):
return self.attrs.get(ATTR_OS_NAME)
def getTransportName(self):
- """@return transport name, same as getAttributes().get(ATTR_TRANSPORT_NAME)"""
+ """@return transport name, same as
+ getAttributes().get(ATTR_TRANSPORT_NAME)"""
return self.attrs.get(ATTR_TRANSPORT_NAME)
def openChannel(self):
"""Open channel to communicate with this peer.
Note: the channel is not fully open yet when this method returns.
Its state is channel.STATE_OPENING.
- Protocol.ChannelOpenListener and IChannel.IChannelListener listeners will be called when
- the channel will change state to open or closed.
- Clients are supposed to register IChannel.IChannelListener right after calling openChannel(), or,
- at least, in same dispatch cycle. For example:
+ Protocol.ChannelOpenListener and IChannel.IChannelListener listeners
+ will be called when the channel will change state to open or closed.
+ Clients are supposed to register IChannel.IChannelListener right after
+ calling openChannel(), or, at least, in same dispatch cycle.
+ For example:
channel = peer.openChannel()
channel.addChannelListener(...)
"""
@@ -126,57 +132,60 @@ class TransientPeer(Peer):
def openChannel(self):
return transport.openChannel(self)
+
class LocalPeer(TransientPeer):
"""
LocalPeer object represents local end-point of TCF communication channel.
There should be exactly one such object in a TCF agent.
- The object can be used to open a loop-back communication channel that allows
- the agent to access its own services same way as remote services.
- Note that "local" here is relative to the agent, and not same as in "local host".
+ The object can be used to open a loop-back communication channel that
+ allows the agent to access its own services same way as remote services.
+ Note that "local" here is relative to the agent, and not same as in
+ "local host".
"""
def __init__(self):
super(LocalPeer, self).__init__(self.createAttributes())
def createAttributes(self):
attrs = {
- ATTR_ID : "TCFLocal",
- ATTR_SERVICE_MANAGER_ID : services.getServiceManagerID(),
- ATTR_AGENT_ID : protocol.getAgentID(),
- ATTR_NAME : "Local Peer",
- ATTR_OS_NAME : os.name,
- ATTR_TRANSPORT_NAME : "Loop"
+ ATTR_ID: "TCFLocal",
+ ATTR_SERVICE_MANAGER_ID: services.getServiceManagerID(),
+ ATTR_AGENT_ID: protocol.getAgentID(),
+ ATTR_NAME: "Local Peer",
+ ATTR_OS_NAME: os.name,
+ ATTR_TRANSPORT_NAME: "Loop"
}
- return attrs;
+ return attrs
+
class AbstractPeer(TransientPeer):
"""
Abstract implementation of IPeer interface.
Objects of this class are stored in Locator service peer table.
The class implements sending notification events to Locator listeners.
- See TransientPeer for IPeer objects that are not stored in the Locator table.
+ See TransientPeer for IPeer objects that are not stored in the Locator
+ table.
"""
- last_heart_beat_time = 0
-
def __init__(self, attrs):
super(AbstractPeer, self).__init__(attrs)
assert protocol.isDispatchThread()
- id = self.getID()
- assert id
+ self.last_heart_beat_time = 0
+ _id = self.getID()
+ assert _id
peers = protocol.getLocator().getPeers()
- if isinstance(peers.get(id), RemotePeer):
- peers.get(id).dispose()
- assert id not in peers
- peers[id] = self
+ if isinstance(peers.get(_id), RemotePeer):
+ peers.get(_id).dispose()
+ assert _id not in peers
+ peers[_id] = self
self.sendPeerAddedEvent()
def dispose(self):
assert protocol.isDispatchThread()
- id = self.getID()
- assert id
+ _id = self.getID()
+ assert _id
peers = protocol.getLocator().getPeers()
- assert peers.get(id) == self
- del peers[id]
+ assert peers.get(_id) == self
+ del peers[_id]
self.sendPeerRemovedEvent()
def onChannelTerminated(self):
@@ -206,11 +215,13 @@ class AbstractPeer(TransientPeer):
protocol.log("Unhandled exception in Locator listener", x)
try:
args = [self.rw_attrs]
- protocol.sendEvent(locator.NAME, "peerChanged", json.dumps(args))
+ protocol.sendEvent(locator.NAME, "peerChanged",
+ channel.toJSONSequence(args))
except IOError as x:
protocol.log("Locator: failed to send 'peerChanged' event", x)
self.last_heart_beat_time = timeVal
- elif self.last_heart_beat_time + locator.DATA_RETENTION_PERIOD / 4 < timeVal:
+ elif self.last_heart_beat_time + locator.DATA_RETENTION_PERIOD / 4 \
+ < timeVal:
for l in protocol.getLocator().getListeners():
try:
l.peerHeartBeat(attrs.get(ATTR_ID))
@@ -218,9 +229,11 @@ class AbstractPeer(TransientPeer):
protocol.log("Unhandled exception in Locator listener", x)
try:
args = [self.rw_attrs.get(ATTR_ID)]
- protocol.sendEvent(locator.NAME, "peerHeartBeat", json.dumps(args))
+ protocol.sendEvent(locator.NAME, "peerHeartBeat",
+ channel.toJSONSequence(args))
except IOError as x:
- protocol.log("Locator: failed to send 'peerHeartBeat' event", x)
+ protocol.log(
+ "Locator: failed to send 'peerHeartBeat' event", x)
self.last_heart_beat_time = timeVal
def sendPeerAddedEvent(self):
@@ -231,7 +244,8 @@ class AbstractPeer(TransientPeer):
protocol.log("Unhandled exception in Locator listener", x)
try:
args = [self.rw_attrs]
- protocol.sendEvent(locator.NAME, "peerAdded", json.dumps(args))
+ protocol.sendEvent(locator.NAME, "peerAdded",
+ channel.toJSONSequence(args))
except IOError as x:
protocol.log("Locator: failed to send 'peerAdded' event", x)
self.last_heart_beat_time = int(time.time() * 1000)
@@ -244,24 +258,25 @@ class AbstractPeer(TransientPeer):
protocol.log("Unhandled exception in Locator listener", x)
try:
args = [self.rw_attrs.get(ATTR_ID)]
- protocol.sendEvent(locator.NAME, "peerRemoved", json.dumps(args))
+ protocol.sendEvent(locator.NAME, "peerRemoved",
+ channel.toJSONSequence(args))
except IOError as x:
protocol.log("Locator: failed to send 'peerRemoved' event", x)
class RemotePeer(AbstractPeer):
"""
- RemotePeer objects represent TCF agents that Locator service discovered on local network.
+ RemotePeer objects represent TCF agents that Locator service discovered on
+ local network.
This includes both local host agents and remote host agents.
Note that "remote peer" means any peer accessible over network,
it does not imply the agent is running on a "remote host".
- If an agent binds multiple network interfaces or multiple ports, it can be represented by
- multiple RemotePeer objects - one per each network address/port combination.
+ If an agent binds multiple network interfaces or multiple ports, it can be
+ represented by multiple RemotePeer objects - one per each network
+ address/port combination.
RemotePeer objects life cycle is managed by Locator service.
"""
- last_update_time = 0
-
def __init__(self, attrs):
super(RemotePeer, self).__init__(attrs)
self.last_update_time = int(time.time() * 1000)
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index fe368d185..468d0a06b 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -722,7 +722,7 @@ class LocatorService(locator.LocatorService):
ok = True
break
if ok:
- _peer = self.peers.get(id)
+ _peer = self.peers.get(_id)
if isinstance(_peer, peer.RemotePeer):
_peer.updateAttributes(attrs)
elif _peer is None:
diff --git a/python/src/tcf/util/sync.py b/python/src/tcf/util/sync.py
index 400c96526..71ed27bb6 100644
--- a/python/src/tcf/util/sync.py
+++ b/python/src/tcf/util/sync.py
@@ -1,5 +1,5 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# *****************************************************************************
+# * Copyright (c) 2011, 2012 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -7,24 +7,29 @@
# *
# * Contributors:
# * Wind River Systems - initial API and implementation
-# *******************************************************************************
+# *****************************************************************************
-import threading, types
+import threading
+import types
from tcf import protocol
from tcf.channel.Command import Command
+
class DispatchWrapper(object):
"Simple wrapper for attribute access and invocation on TCF dispatch thread"
def __init__(self, inner):
self.inner = inner
+
def __getattr__(self, attr):
val = protocol.invokeAndWait(getattr, self.inner, attr)
if type(val) in (types.FunctionType, types.MethodType):
return DispatchWrapper(val)
return val
+
def __call__(self, *args, **kwargs):
return protocol.invokeAndWait(self.inner, *args, **kwargs)
+
class CommandControl(object):
"""Provides a simple interface to send commands to remote services
and receive results.
@@ -36,7 +41,7 @@ class CommandControl(object):
Examples:
# send command, but don't wait for result:
> cmd.RunControl.suspend("system")
- # getE() returns the result and raises an exception in case of error response:
+ # getE() returns the result and raises an exception in case of error:
> result = cmd.RunControl.getChildren(None).getE()
# to get error and result at the same time, use this form:
> error, result = cmd.Breakpoints.getIDs()
@@ -49,10 +54,12 @@ class CommandControl(object):
self._queue = []
self._pending = {}
self._complete = []
+
def __getattr__(self, attr):
val = getattr(self._channel, attr, None)
if val:
- if self._interactive and type(val) in (types.FunctionType, types.MethodType):
+ if self._interactive and type(val) in (types.FunctionType,
+ types.MethodType):
val = DispatchWrapper(val)
return val
services = protocol.invokeAndWait(self._channel.getRemoteServices)
@@ -60,12 +67,15 @@ class CommandControl(object):
return services
if attr in services:
return ServiceWrapper(self, attr)
- raise AttributeError("Unknown service: %s. Use one of %s" % (attr, services))
+ raise AttributeError("Unknown service: %s. Use one of %s" % (
+ attr, services))
+
def invoke(self, service, command, *args, **kwargs):
cmd = None
if not protocol.isDispatchThread():
if not kwargs.get("async"):
- cmd = protocol.invokeAndWait(self._invoke, service, command, *args, **kwargs)
+ cmd = protocol.invokeAndWait(self._invoke, service, command,
+ *args, **kwargs)
if cmd and self._interactive:
return cmd.getE()
else:
@@ -75,52 +85,68 @@ class CommandControl(object):
protocol.invokeLater(self._processQueue)
return
return cmd
+
def _invoke(self, service, command, *args, **kwargs):
cmdCtrl = self
+
class GenericCommand(Command):
_result = None
- def done(self, error, args):
+ def done(self, error, args): #@IgnorePep8
resultArgs = None
if not error and args:
- # error result is usually in args[0], but there are exceptions
+ # error result is usually in args[0],
+ # but there are exceptions
if service == "StackTrace" and command == "getContext":
error = self.toError(args[1])
resultArgs = (args[0],)
elif service == "Expressions" and command == "evaluate":
error = self.toError(args[1])
resultArgs = (args[0], args[2])
- elif service == "FileSystem" and command in ('read', 'readdir', 'roots'):
+ elif service == "FileSystem" and command in (
+ 'read', 'readdir', 'roots'):
error = self.toError(args[1])
resultArgs = (args[0],) + tuple(args[2:])
- elif service == "Diagnostics" and command.startswith("echo"):
+ elif service == "Streams" and command == 'read':
+ error = self.toError(args[1])
+ resultArgs = (args[0],) + tuple(args[2:])
+ elif service == "Diagnostics" and \
+ command.startswith("echo"):
resultArgs = (args[0],)
else:
error = self.toError(args[0])
resultArgs = args[1:]
cmdCtrl._doneCommand(self.token, error, resultArgs)
+
def wait(self, timeout=None):
cmdCtrl._waitForCommand(self.token, timeout)
+
def cancel(self):
return protocol.invokeAndWait(self.token.cancel)
+
def getResult(self, wait=None):
if wait:
cmdCtrl._waitForCommand(self.token)
return self._result
+
def getE(self):
r = self.getResult(True)
if r.error:
raise r.error
return r.args
+
def get(self):
r = self.getResult(True)
return r.args
+
def getError(self):
r = self.getResult(True)
return r.error
+
def __str__(self):
if self._async:
return self.getCommandString()
return str(self.get())
+
def __iter__(self):
return iter(self.getResult(True))
cmd = GenericCommand(self._channel, service, command, args)
@@ -128,6 +154,7 @@ class CommandControl(object):
cmd._onDone = kwargs.get("onDone")
self._addPending(cmd)
return cmd
+
def _processQueue(self):
assert protocol.isDispatchThread()
with self._lock:
@@ -135,44 +162,57 @@ class CommandControl(object):
service, command, args, kwargs = cmd
self._invoke(service, command, *args, **kwargs)
del self._queue[:]
+
def _addPending(self, cmd):
with self._lock:
self._pending[cmd.token.id] = cmd
self._lock.notifyAll()
+
def _doneCommand(self, token, error, args):
with self._lock:
cmd = self._pending.get(token.id)
assert cmd
del self._pending[token.id]
cmd._result = CommandResult(token, error, args)
- if cmd._async: self._complete.append(cmd)
+ if cmd._async:
+ self._complete.append(cmd)
isDone = self.isDone()
- if isDone: self._lock.notifyAll()
+ if isDone:
+ self._lock.notifyAll()
if cmd._onDone:
- if args is None: args = (None,)
+ if args is None:
+ args = (None,)
cmd._onDone(error, *args)
- if isDone and self._onDone: self._onDone()
+ if isDone and self._onDone:
+ self._onDone()
+
def isDone(self):
with self._lock:
return not self._pending and not self._queue
+
def wait(self, timeout=None):
assert not protocol.isDispatchThread()
with self._lock:
while self._pending or self._queue:
self._lock.wait(timeout)
- if timeout: break
+ if timeout:
+ break
+
def _waitForCommand(self, token, timeout=None):
assert not protocol.isDispatchThread()
with self._lock:
while token.id in self._pending:
self._lock.wait(timeout)
- if timeout: break
+ if timeout:
+ break
else:
if self._queue:
self._lock.wait(timeout)
while token.id in self._pending:
self._lock.wait(timeout)
- if timeout: break
+ if timeout:
+ break
+
def cancel(self):
if not protocol.isDispatchThread():
protocol.invokeLater(self.cancel)
@@ -181,6 +221,7 @@ class CommandControl(object):
for cmd in self._pending.values():
cmd.token.cancel()
del self._queue[:]
+
def getResult(self, wait=True):
if wait:
self.wait()
@@ -189,6 +230,7 @@ class CommandControl(object):
del self._complete[:]
return result
+
class CommandResult(object):
def __init__(self, token, error, args):
self.token = token
@@ -197,26 +239,33 @@ class CommandResult(object):
if args and len(args) == 1:
args = args[0]
self.args = args
+
def __str__(self):
if self.error:
return "[%s] error: %s" % (self.token.id, self.error)
return "[%s] result: %s" % (self.token.id, self.args)
__repr__ = __str__
+
def __iter__(self):
yield self.error
yield self.args
+
class ServiceWrapper(object):
def __init__(self, control, service):
self._control = control
self._service = service
+
def __getattr__(self, attr):
return CommandWrapper(self._control, self._service, attr)
+
class CommandWrapper(object):
def __init__(self, control, service, command):
self._control = control
self._service = service
self._command = command
+
def __call__(self, *args, **kwargs):
- return self._control.invoke(self._service, self._command, *args, **kwargs)
+ return self._control.invoke(
+ self._service, self._command, *args, **kwargs)

Back to the top