diff options
34 files changed, 5644 insertions, 0 deletions
diff --git a/python/.project b/python/.project new file mode 100644 index 000000000..ea91adff9 --- /dev/null +++ b/python/.project @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>org.eclipse.tcf.python</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.python.pydev.PyDevBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.python.pydev.pythonNature</nature>
+ </natures>
+</projectDescription>
diff --git a/python/.pydevproject b/python/.pydevproject new file mode 100644 index 000000000..e85ac4aa6 --- /dev/null +++ b/python/.pydevproject @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<?eclipse-pydev version="1.0"?>
+
+<pydev_project>
+<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
+<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.6</pydev_property>
+<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
+<path>/org.eclipse.tcf.python/src</path>
+</pydev_pathproperty>
+</pydev_project>
diff --git a/python/about.html b/python/about.html new file mode 100644 index 000000000..9f6c454f0 --- /dev/null +++ b/python/about.html @@ -0,0 +1,28 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" + "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> +<html xmlns="http://www.w3.org/1999/xhtml"> +<head> +<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"/> +<title>About</title> +</head> +<body lang="EN-US"> +<h2>About This Content</h2> + +<p>March 31, 2011</p> +<h3>License</h3> + +<p>The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at <a href="http://www.eclipse.org/legal/epl-v10.html">http://www.eclipse.org/legal/epl-v10.html</a>. +For purposes of the EPL, "Program" will mean the Content.</p> + +<p>If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at <a href="http://www.eclipse.org/">http://www.eclipse.org</a>.</p> + +</body> +</html>
\ No newline at end of file diff --git a/python/src/tcf/EventQueue.py b/python/src/tcf/EventQueue.py new file mode 100644 index 000000000..1c8a3e90c --- /dev/null +++ b/python/src/tcf/EventQueue.py @@ -0,0 +1,78 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import threading, exceptions
+import protocol
+
+class EventQueue(object):
+
+ def __init__(self):
+ self.__thread = threading.Thread(target=self, name="TCF Event Dispatcher")
+ self.__thread.daemon = True
+ self.__is_waiting = False
+ self.__is_shutdown = False
+ self.__lock = threading.Condition()
+ self.__queue = []
+
+ def start(self):
+ self.__thread.start()
+
+ def shutdown(self):
+ try:
+ with self.__lock:
+ self.__is_shutdown = True
+ if self.__is_waiting:
+ self.__is_waiting = False
+ self.__lock.notifyAll()
+ self.__thread.join()
+ except exceptions.Exception as e:
+ protocol.log("Failed to shutdown TCF event dispatch thread", e)
+
+ def isShutdown(self):
+ with self._lock:
+ return self.__is_shutdown
+
+ def __error(self, x):
+ protocol.log("Unhandled exception in TCF event dispatch", x)
+
+ def __call__(self):
+ while True:
+ try:
+ with self.__lock:
+ while not self.__queue:
+ if self.__is_shutdown: return
+ self.__is_waiting = True
+ self.__lock.wait()
+ r, args, kwargs = self.__queue.pop(0)
+ r(*args, **kwargs)
+ except exceptions.Exception as x:
+ self.__error(x)
+
+ def invokeLater(self, r, *args, **kwargs):
+ assert r;
+ with self.__lock:
+ if self.__is_shutdown: raise exceptions.RuntimeError("TCF event dispatcher has shut down")
+ self.__queue.append((r, args, kwargs))
+ if self.__is_waiting:
+ self.__is_waiting = False
+ self.__lock.notifyAll()
+
+ def isDispatchThread(self):
+ return threading.currentThread() is self.__thread
+
+ def getCongestion(self):
+ with self.__lock:
+ job_cnt = 0
+ l0 = job_cnt / 10 - 100
+ l1 = len(self.__queue) / 10 - 100
+ if l1 > l0: l0 = l1
+ if l0 > 100: l0 = 100
+ return l0
diff --git a/python/src/tcf/__init__.py b/python/src/tcf/__init__.py new file mode 100644 index 000000000..2031cb285 --- /dev/null +++ b/python/src/tcf/__init__.py @@ -0,0 +1,57 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+"""
+TCF - Target Communication Framework
+"""
+
+import types, exceptions
+import protocol, peer, channel
+from util import task
+
+__all__ = ('connect')
+
+def connect(params, wait=True):
+ """Connect to peer. Argument is a string of the form <transport>:<host>:<port>,
+ e.g. "TCP:127.0.0.1:1534" """
+ if type(params) is types.StringType:
+ params = _parse_params(params)
+ elif type(params) is not types.DictType:
+ raise exceptions.TypeError("Expected string or dict")
+ p = peer.TransientPeer(params)
+ if wait:
+ c = task.Task(_openChannel, p).get()
+ else:
+ c = protocol.invokeAndWait(p.openChannel)
+ return c
+
+def _openChannel(p, done=None):
+ assert protocol.isDispatchThread()
+ c = p.openChannel()
+ if done is None: return
+ class ChannelListener(channel.ChannelListener):
+ def onChannelOpened(self):
+ c.removeChannelListener(self)
+ done(None, c)
+ def onChannelClosed(self, error):
+ done(error, None)
+ c.addChannelListener(ChannelListener())
+
+def _parse_params(paramStr):
+ args = paramStr.split(":")
+ if len(args) != 3:
+ raise ValueError("Expected format: <transport>:<host>:<port>")
+ transp, host, port = args
+ return {
+ peer.ATTR_IP_HOST : host,
+ peer.ATTR_IP_PORT : port,
+ peer.ATTR_TRANSPORT_NAME : transp
+ }
diff --git a/python/src/tcf/channel/AbstractChannel.py b/python/src/tcf/channel/AbstractChannel.py new file mode 100644 index 000000000..f343d58ef --- /dev/null +++ b/python/src/tcf/channel/AbstractChannel.py @@ -0,0 +1,753 @@ +# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +import sys, threading, time, types +from exceptions import Exception, IOError +from tcf import protocol, transport, services, peer, errors +from tcf.services import locator +from tcf.channel import STATE_CLOSED, STATE_OPEN, STATE_OPENING +from tcf.channel import Token, fromJSONSequence, toJSONSequence + +EOS = -1 # End Of Stream +EOM = -2 # End Of Message + + +class Message(object): + def __init__(self, typeCode): + if type(typeCode) is types.IntType: + typeCode = chr(typeCode) + self.type = typeCode + self.service = None + self.name = None + self.data = None + self.is_canceled = None + self.is_sent = None + self.token = None + self.trace = () + def __str__(self): + return "%s %s %s" % (self.type, self.service, self.name) + +class ReaderThread(threading.Thread): + def __init__(self, channel, handleInput): + super(ReaderThread, self).__init__(name="TCF Reader Thread") + self.channel = channel + self.handleInput = handleInput + self.buf = bytearray() + self.eos_err_report = None + self.daemon = True + + def error(self): + raise IOError("Protocol syntax error") + + def readBytes(self, end, buf=None): + if buf is None: + buf = bytearray() + while True: + n = self.channel.read() + if n <= 0: + if n == end: break + if n == EOM: raise IOError("Unexpected end of message") + if n < 0: raise IOError("Communication channel is closed by remote peer") + buf.append(n) + return buf + + def readString(self): + del self.buf[:] + bytes = self.readBytes(0, self.buf) + return bytes.decode("UTF8") + + def run(self): + try: + while True: + n = self.channel.read() + if n == EOM: continue + if n == EOS: + try: + self.eos_err_report = self.readBytes(EOM) + reportLen = len(self.eos_err_report) + if reportLen == 0 or reportLen == 1 and self.eos_err_report[0] == 0: + self.eos_err_report = None + except: + pass + break + msg = Message(n) + if self.channel.read() != 0: self.error() + typeCode = msg.type + if typeCode == 'C': + msg.token = Token(self.readBytes(0)) + msg.service = self.readString() + msg.name = self.readString() + msg.data = self.readBytes(EOM) + elif typeCode in 'PRN': + msg.token = Token(self.readBytes(0)) + msg.data = self.readBytes(EOM) + elif typeCode == 'E': + msg.service = self.readString() + msg.name = self.readString() + msg.data = self.readBytes(EOM) + elif typeCode == 'F': + msg.data = self.readBytes(EOM) + else: + self.error() + protocol.invokeLater(self.handleInput, msg) + delay = self.channel.local_congestion_level + if delay > 0: time.sleep(delay / 1000.0) + protocol.invokeLater(self.handleEOS) + except Exception as x: + try: + x.tb = sys.exc_info()[2] + protocol.invokeLater(self.channel.terminate, x) + except: + # TCF event dispatcher has shut down + pass + + def handleEOS(self): + if not self.channel.out_tokens and not self.eos_err_report: + self.channel.close() + else: + x = IOError("Communication channel is closed by remote peer") + if self.eos_err_report: + try: + args = fromJSONSequence(self.eos_err_report) + if len(args) > 0 and args[0] is not None: + x.caused_by = Exception(errors.toErrorString(args[0])) + except IOError: + pass + self.channel.terminate(x) + + +class AbstractChannel(object): + """ + AbstractChannel implements communication link connecting two end points (peers). + The channel asynchronously transmits messages: commands, results and events. + + Clients can subclass AbstractChannel to support particular transport (wire) protocol. + Also, see StreamChannel for stream oriented transport protocols. + """ + + def __init__(self, remote_peer, local_peer=None): + self.remote_peer = remote_peer + self.local_peer = local_peer # TODO + self.inp_thread = ReaderThread(self, self.__handleInput) + self.out_thread = threading.Thread(target=self.__write_output,name="TCF Channel Transmitter") + self.out_thread.daemon = True + self.out_tokens = {} + self.out_queue = [] + self.out_lock = threading.Condition() + self.pending_command_limit = 32 + self.remote_service_by_class = {} + self.local_service_by_name = {} + self.remote_service_by_name = {} + self.channel_listeners = [] + self.event_listeners = {} + self.command_servers = {} + self.redirect_queue = [] + self.redirect_command = None + self.notifying_channel_opened = False + self.registered_with_trasport = False + self.state = STATE_OPENING + self.proxy = None + self.zero_copy = False + + self.local_congestion_level = -100 + self.remote_congestion_level = -100 + self.local_congestion_cnt = 0 + self.local_congestion_time = 0 + self.local_service_by_class = {} + self.trace_listeners = [] + + def __write_output(self): + try: + while True: + msg = None + last = False + with self.out_lock: + while len(self.out_queue) == 0: + self.out_lock.wait() + msg = self.out_queue.pop(0) + if not msg: break + last = len(self.out_queue) == 0 + if msg.is_canceled: + if last: self.flush() + continue + msg.is_sent = True + if msg.trace: + protocol.invokeLater(self.__traceMessageSent, msg) + self.write(msg.type) + self.write(0) + if msg.token: + self.write(msg.token.id) + self.write(0) + if msg.service: + self.write(msg.service.encode("UTF8")) + self.write(0) + if msg.name: + self.write(msg.name.encode("UTF8")) + self.write(0) + if msg.data: + self.write(msg.data) + self.write(EOM) + delay = 0 + level = self.remote_congestion_level + if level > 0: delay = level * 10 + if last or delay > 0: self.flush() + if delay > 0: time.sleep(delay / 1000.0) + #else yield() + self.write(EOS) + self.write(EOM) + self.flush() + except Exception as x: + try: + protocol.invokeLater(self.terminate, x) + except: + # TCF event dispatcher has shut down + pass + + def __traceMessageSent(self, m): + for l in m.trace: + try: + id = None + if m.token is not None: + id = m.token.getID() + l.onMessageSent(m.type, id, m.service, m.name, m.data) + except Exception as x: + protocol.log("Exception in channel listener", x) + + def start(self): + assert protocol.isDispatchThread() + protocol.invokeLater(self.__initServices) + self.inp_thread.start() + self.out_thread.start() + + def __initServices(self): + try: + if self.proxy: return + if self.state == STATE_CLOSED: return + services.onChannelCreated(self, self.local_service_by_name) + self.__makeServiceByClassMap(self.local_service_by_name, self.local_service_by_class) + args = self.local_service_by_name.keys() + self.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((args,))) + except IOError as x: + self.terminate(x) + + def redirect_id(self, peer_id): + """ + Redirect this channel to given peer using this channel remote peer locator service as a proxy. + @param peer_id - peer that will become new remote communication endpoint of this channel + """ + map = {} + map[peer.ATTR_ID] = peer_id + self.redirect(map) + + def redirect(self, peer_attrs): + """ + Redirect this channel to given peer using this channel remote peer locator service as a proxy. + @param peer_attrs - peer that will become new remote communication endpoint of this channel + """ + channel = self + assert protocol.isDispatchThread() + if self.state == STATE_OPENING: + self.redirect_queue.append(peer_attrs) + else: + assert self.state == STATE_OPEN + assert self.redirect_command is None + try: + l = self.remote_service_by_class.get(locator.LocatorService) + if not l: raise IOError("Cannot redirect channel: peer " + + self.remote_peer.getID() + " has no locator service") + peer_id = peer_attrs.get(peer.ATTR_ID) + if peer_id and len(peer_attrs) == 1: + peer = l.getPeers().get(peer_id) + if not peer: + # Peer not found, must wait for a while until peer is discovered or time out + class Callback(object): + found = None + def __call__(self): + if self.found: return + self.channel.terminate(Exception("Peer " + peer_id + " not found")) + cb = Callback() + protocol.invokeLaterWithDelay(locator.DATA_RETENTION_PERIOD / 3, cb) + class Listener(locator.LocatorListener): + def peerAdded(self, peer): + if peer.getID() == peer_id: + cb.found = True + channel.state = STATE_OPEN + l.removeListener(self) + channel.redirect_id(peer_id) + l.addListener(Listener()) + else: + class DoneRedirect(locator.DoneRedirect): + def doneRedirect(self, token, exc): + assert channel.redirect_command is token + channel.redirect_command = None + if channel.state != STATE_OPENING: return + if exc: channel.terminate(exc) + channel.remote_peer = peer + channel.remote_service_by_class.clear() + channel.remote_service_by_name.clear() + channel.event_listeners.clear() + self.redirect_command = l.redirect(peer_id, DoneRedirect()) + else: + class TransientPeer(peer.TransientPeer): + def __init__(self, peer_attrs, parent): + super(TransientPeer, self).__init__(peer_attrs) + self.parent = parent + def openChannel(self): + c = self.parent.openChannel() + c.redirect(peer_attrs) + class DoneRedirect(locator.DoneRedirect): + def doneRedirect(self, token, exc): + assert channel.redirect_command is token + channel.redirect_command = None + if channel.state != STATE_OPENING: return + if exc: channel.terminate(exc) + parent = channel.remote_peer + channel.remote_peer = TransientPeer(peer_attrs, parent) + channel.remote_service_by_class.clear() + channel.remote_service_by_name.clear() + channel.event_listeners.clear() + self.redirect_command = l.redirect(peer_attrs, DoneRedirect()) + self.state = STATE_OPENING + except Exception as x: + self.terminate(x) + + def __makeServiceByClassMap(self, by_name, by_class): + for service in by_name.values(): + for clazz in service.__class__.__bases__: + if clazz == services.Service: continue + # TODO + # if (!IService.class.isAssignableFrom(fs)) continue + by_class[clazz] = service + + def getState(self): + return self.state + + def addChannelListener(self, listener): + assert protocol.isDispatchThread() + assert listener + self.channel_listeners.append(listener) + + def removeChannelListener(self, listener): + assert protocol.isDispatchThread() + self.channel_listeners.remove(listener) + + def addTraceListener(self, listener): + if self.trace_listeners is None: + self.trace_listeners = [] + else: + self.trace_listeners = self.trace_listeners[:] + self.trace_listeners.append(listener) + + def removeTraceListener(self, listener): + self.trace_listeners = self.trace_listeners[:] + self.trace_listeners.remove(listener) + if len(self.trace_listeners) == 0: self.trace_listeners = None + + def addEventListener(self, service, listener): + assert protocol.isDispatchThread() + svc_name = str(service) + listener.svc_name = svc_name + list = self.event_listeners.get(svc_name) or [] + list.append(listener) + self.event_listeners[svc_name] = list + + def removeEventListener(self, service, listener): + assert protocol.isDispatchThread() + svc_name = str(service) + list = self.event_listeners.get(svc_name) + if not list: return + for i in range(len(list)): + if list[i] is listener: + if len(list) == 1: + del self.event_listeners[svc_name] + else: + del list[i] + return + + def addCommandServer(self, service, listener): + assert protocol.isDispatchThread() + svc_name = str(service) + if self.command_servers.get(svc_name): + raise Exception("Only one command server per service is allowed") + self.command_servers[svc_name] = listener + + def removeCommandServer(self, service, listener): + assert protocol.isDispatchThread() + svc_name = str(service) + if self.command_servers.get(svc_name) is not listener: + raise Exception("Invalid command server") + del self.command_servers[svc_name] + + def close(self): + assert protocol.isDispatchThread() + if self.state == STATE_CLOSED: return + try: + self.__sendEndOfStream(10000) + self._close(None) + except Exception as x: + self._close(x) + + def terminate(self, error): + assert protocol.isDispatchThread() + if self.state == STATE_CLOSED: return + try: + self.__sendEndOfStream(500) + except Exception as x: + if not error: error = x + self._close(error) + + def __sendEndOfStream(self, timeout): + with self.out_lock: + del self.out_queue[:] + self.out_queue.append(None) + self.out_lock.notify() + self.out_thread.join(timeout) + + def _close(self, error): + assert self.state != STATE_CLOSED + self.state = STATE_CLOSED + # Closing channel underlying streams can block for a long time, + # so it needs to be done by a background thread. + thread = threading.Thread(target=self.stop, name="TCF Channel Cleanup") + thread.daemon = True + thread.start() + if error and isinstance(self.remote_peer, peer.AbstractPeer): + self.remote_peer.onChannelTerminated() + if self.registered_with_trasport: + self.registered_with_trasport = False + transport.channelClosed(self, error) + if self.proxy: + try: + self.proxy.onChannelClosed(error) + except Exception as x: + protocol.log("Exception in channel listener", x) + channel = self + class Runnable(object): + def __call__(self): + if channel.out_tokens: + x = None + if isinstance(error, Exception): x = error + elif error: x = Exception(error) + else: x = IOError("Channel is closed") + for msg in channel.out_tokens.values(): + try: + s = str(msg) + if len(s) > 72: s = s[:72] + "...]" + y = IOError("Command " + s + " aborted") +# y.initCause(x) + msg.token.getListener().terminated(msg.token, y) + except Exception as e: + protocol.log("Exception in command listener", e) + channel.out_tokens.clear() + if channel.channel_listeners: + for l in channel.channel_listeners: + if not l: break + try: + l.onChannelClosed(error) + except Exception as x: + protocol.log("Exception in channel listener", x) + elif error: + protocol.log("TCF channel terminated", error) + if channel.trace_listeners: + for l in channel.trace_listeners: + try: + l.onChannelClosed(error) + except Exception as x: + protocol.log("Exception in channel listener", x) + protocol.invokeLater(Runnable()) + + def getCongestion(self): + assert protocol.isDispatchThread() + level = len(self.out_tokens) * 100 / self.pending_command_limit - 100 + if self.remote_congestion_level > level: level = self.remote_congestion_level + if level > 100: level = 100 + return level + + def getLocalPeer(self): + assert protocol.isDispatchThread() + return self.local_peer + + def getRemotePeer(self): + assert protocol.isDispatchThread() + return self.remote_peer + + def getLocalServices(self): + assert protocol.isDispatchThread() + assert self.state != STATE_OPENING + return self.local_service_by_name.keys() + + def getRemoteServices(self): + assert protocol.isDispatchThread() + assert self.state != STATE_OPENING + return self.remote_service_by_name.keys() + + def getLocalService(self, cls_or_name): + assert protocol.isDispatchThread() + assert self.state != STATE_OPENING + if type(cls_or_name) == types.StringType: + return self.local_service_by_name.get(cls_or_name) + else: + return self.local_service_by_class.get(cls_or_name) + + def getRemoteService(self, cls_or_name): + assert protocol.isDispatchThread() + assert self.state != STATE_OPENING + if type(cls_or_name) == types.StringType: + return self.remote_service_by_name.get(cls_or_name) + else: + return self.remote_service_by_class.get(cls_or_name) + + def setServiceProxy(self, service_interface, service_proxy): + if not self.notifying_channel_opened: raise Exception("setServiceProxe() can be called only from channel open call-back") + if not isinstance(self.remote_service_by_name.get(service_proxy.getName()), services.GenericProxy): raise Exception("Proxy already set") + if self.remote_service_by_class.get(service_interface): raise Exception("Proxy already set") + self.remote_service_by_class[service_interface] = service_proxy + self.remote_service_by_name[service_proxy.getName()] = service_proxy + + def setProxy(self, proxy, services): + self.proxy = proxy + self.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((services,))) + self.local_service_by_class.clear() + self.local_service_by_name.clear() + + def addToOutQueue(self, msg): + msg.trace = self.trace_listeners + with self.out_lock: + self.out_queue.append(msg) + self.out_lock.notify() + + def sendCommand(self, service, name, args, listener): + assert protocol.isDispatchThread() + if self.state == STATE_OPENING: raise Exception("Channel is waiting for Hello message") + if self.state == STATE_CLOSED: raise Exception("Channel is closed") + msg = Message('C') + msg.service = str(service) + msg.name = name + msg.data = args + channel = self + class CancelableToken(Token): + def __init__(self, listener): + super(CancelableToken, self).__init__(listener=listener) + def cancel(self): + assert protocol.isDispatchThread() + if channel.state != STATE_OPEN: return False + with channel.out_lock: + if msg.is_sent: return False + msg.is_canceled = True + del channel.out_tokens[msg.token.getID()] + return True + token = CancelableToken(listener) + msg.token = token + self.out_tokens[token.getID()] = msg + self.addToOutQueue(msg) + return token + + def sendProgress(self, token, results): + assert protocol.isDispatchThread() + if self.state != STATE_OPEN: raise Exception("Channel is closed") + msg = Message('P') + msg.data = results + msg.token = token + self.addToOutQueue(msg) + + def sendResult(self, token, results): + assert protocol.isDispatchThread() + if self.state != STATE_OPEN: raise Exception("Channel is closed") + msg = Message('R') + msg.data = results + msg.token = token + self.addToOutQueue(msg) + + def rejectCommand(self, token): + assert protocol.isDispatchThread() + if self.state != STATE_OPEN: raise Exception("Channel is closed") + msg = Message('N') + msg.token = token + self.addToOutQueue(msg) + + def sendEvent(self, service, name, args): + assert protocol.isDispatchThread() + if not (self.state == STATE_OPEN or self.state == STATE_OPENING and isinstance(service, locator.LocatorService)): + raise Exception("Channel is closed") + msg = Message('E') + msg.service = str(service) + msg.name = name + msg.data = args + self.addToOutQueue(msg) + + def isZeroCopySupported(self): + return self.zero_copy + + def __traceMessageReceived(self, m): + for l in self.trace_listeners: + try: + id = None + if m.token is not None: + id = m.token.getID() + l.onMessageReceived(m.type, id, m.service, m.name, m.data) + except Exception as x: + protocol.log("Exception in channel listener", x) + + def __handleInput(self, msg): + assert protocol.isDispatchThread() + if self.state == STATE_CLOSED: return + if self.trace_listeners: + self.__traceMessageReceived(msg) + try: + token = None + typeCode = msg.type + if typeCode in 'PRN': + token_id = msg.token.getID() + cmd = self.out_tokens.get(token_id) + if cmd is None: raise Exception("Invalid token received: " + token_id) + if typeCode != 'P': + del self.out_tokens[token_id] + token = cmd.token + if typeCode == 'C': + if self.state == STATE_OPENING: + raise IOError("Received command " + msg.service + "." + msg.name + " before Hello message") + if self.proxy: + self.proxy.onCommand(msg.token, msg.service, msg.name, msg.data) + else: + token = msg.token + cmds = self.command_servers.get(msg.service) + if cmds: + cmds.command(token, msg.name, msg.data) + else: + self.rejectCommand(token) + elif typeCode == 'P': + token.getListener().progress(token, msg.data) + self.__sendCongestionLevel() + elif typeCode == 'R': + token.getListener().result(token, msg.data) + self.__sendCongestionLevel() + elif typeCode == 'N': + token.getListener().terminated(token, errors.ErrorReport( + "Command is not recognized", errors.TCF_ERROR_INV_COMMAND)) + elif typeCode == 'E': + hello = msg.service == locator.NAME and msg.name == "Hello" + if hello: + self.remote_service_by_name.clear() + self.remote_service_by_class.clear() + data = fromJSONSequence(msg.data)[0] + services.onChannelOpened(self, data, self.remote_service_by_name) + self.__makeServiceByClassMap(self.remote_service_by_name, self.remote_service_by_class) + self.zero_copy = self.remote_service_by_name.has_key("ZeroCopy") + if self.proxy and self.state == STATE_OPEN: + self.proxy.onEvent(msg.service, msg.name, msg.data) + elif hello: + assert self.state == STATE_OPENING + self.state = STATE_OPEN + assert self.redirect_command is None + if self.redirect_queue: + self.redirect(self.redirect_queue.pop(0)) + else: + self.notifying_channel_opened = True + if not self.registered_with_trasport: + transport.channelOpened(self) + self.registered_with_trasport = True + for l in self.channel_listeners: + if not l: break + try: + l.onChannelOpened() + except Exception as x: + protocol.log("Exception in channel listener", x) + self.notifying_channel_opened = False + else: + list = self.event_listeners.get(msg.service) + if list: + for l in list: + l.event(msg.name, msg.data) + self.__sendCongestionLevel() + elif typeCode == 'F': + len = len(msg.data) + if len > 0 and msg.data[len - 1] == '\0': len -= 1 + self.remote_congestion_level = int(msg.data) + else: + assert False + except Exception as x: + x.tb = sys.exc_info()[2] + self.terminate(x) + + def __sendCongestionLevel(self): + self.local_congestion_cnt += 1 + if self.local_congestion_cnt < 8: return + self.local_congestion_cnt = 0 + if self.state != STATE_OPEN: return + timeVal = int(time.time()) + if timeVal - self.local_congestion_time < 500: return + assert protocol.isDispatchThread() + level = protocol.getCongestionLevel() + if level == self.local_congestion_level: return + i = (level - self.local_congestion_level) / 8 + if i != 0: level = self.local_congestion_level + i + self.local_congestion_time = timeVal + with self.out_lock: + msg = None + if self.out_queue: + msg = self.out_queue[0] + if msg is None or msg.type != 'F': + msg = Message('F') + self.out_queue.insert(0, msg) + self.out_lock.notify() + data = "%i\0" % self.local_congestion_level + msg.data = data + msg.trace = self.trace_listeners + self.local_congestion_level = level + + def read(self): + """ + Read one byte from the channel input stream. + @return next data byte or EOS (-1) if end of stream is reached, + or EOM (-2) if end of message is reached. + @raises IOError + """ + raise NotImplementedError("Abstract method") + + def writeByte(self, n): + """ + Write one byte into the channel output stream. + The method argument can be one of two special values: + EOS (-1) end of stream marker + EOM (-2) end of message marker. + The stream can put the byte into a buffer instead of transmitting it right away. + @param n - the data byte. + @raises IOError + """ + raise NotImplementedError("Abstract method") + + def flush(self): + """ + Flush the channel output stream. + All buffered data should be transmitted immediately. + @raises IOError + """ + raise NotImplementedError("Abstract method") + + def stop(self): + """ + Stop (close) channel underlying streams. + If a thread is blocked by read() or write(), it should be + resumed (or interrupted). + @raises IOError + """ + raise NotImplementedError("Abstract method") + + def write(self, buf): + """ + Write array of bytes into the channel output stream. + The stream can put bytes into a buffer instead of transmitting it right away. + @param buf + @raises IOError + """ + assert threading.currentThread() == self.out_thread + for i in buf: + self.writeByte(ord(buf[i]) & 0xff) diff --git a/python/src/tcf/channel/ChannelTCP.py b/python/src/tcf/channel/ChannelTCP.py new file mode 100644 index 000000000..77d165bce --- /dev/null +++ b/python/src/tcf/channel/ChannelTCP.py @@ -0,0 +1,90 @@ +# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +import socket, types +from tcf import protocol +from StreamChannel import StreamChannel + +class ChannelTCP(StreamChannel): + "ChannelTCP is a channel implementation that works on top of TCP sockets as a transport." + + def __init__(self, remote_peer, host, port): + super(ChannelTCP, self).__init__(remote_peer) + self.closed = False + self.started = False + channel = self + class CreateSocket(object): + def __call__(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + channel.socket = sock + channel._onSocketConnected(None) + protocol.invokeLater(CreateSocket()) + + def _onSocketConnected(self, x): + if x: + self.terminate(x) + self.closed = True + if self.closed: + try: + if self.socket: + self.socket.close() + except socket.error as y: + protocol.log("Cannot close socket", y) + else: + self.started = True + self.start() + + def get(self): + if self.closed: return -1 + try: + return ord(self.socket.recv(1)) + except socket.error as x: + if self.closed: return -1 + raise x + + def getBuf(self, buf): + if self.closed: return -1 + try: + return self.socket.recv_into(buf) + except socket.error as x: + if self.closed: return -1 + raise x + + def put(self, b): + if self.closed: return + t = type(b) + if t is types.StringType: + s = b + elif t is types.IntType: + s = chr(b) + else: + raise "Illegal argument type: %s" % t + self.socket.send(s) + + def putBuf(self, buf): + if self.closed: return + t = type(buf) + if t is types.StringType: + s = buf + else: + s = str(buf) + self.socket.sendall(s) + + def flush(self): + pass + + def stop(self): + self.closed = True + if self.started: + self.socket.close() diff --git a/python/src/tcf/channel/Command.py b/python/src/tcf/channel/Command.py new file mode 100644 index 000000000..5150ad0b7 --- /dev/null +++ b/python/src/tcf/channel/Command.py @@ -0,0 +1,123 @@ +# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +import json, exceptions, cStringIO +from tcf import protocol, errors, services +from tcf.channel import Token, toJSONSequence, fromJSONSequence + +class Command(object): + """ + This is utility class that helps to implement sending a command and receiving + command result over TCF communication channel. The class uses JSON to encode + command arguments and to decode result data. + + The class also provides support for TCF standard error report encoding. + + Clients are expected to subclass <code>Command</code> and override <code>done</code> method. + + Note: most clients don't need to handle protocol commands directly and + can use service APIs instead. Service API does all command encoding/decoding + for a client. + + Typical usage example: + + def getContext(self, id, done): + class GetContextCommand(Command): + def done(self, error, args): + ctx = None + if not error: + assert len(args) == 2 + error = self.toError(args[0]) + if args[1]: ctx = Context(args[1]) + done.doneGetContext(self.token, error, ctx) + command = GetContextCommand(self.channel, self, "getContext", [id]) + return command.token + """ + __done = False + + def __init__(self, channel, service, command, args): + if isinstance(service, services.Service): + service = service.getName() + self.service = service + self.command = command + self.args = args + t = None + try: + # TODO zero_copy + #zero_copy = channel.isZeroCopySupported() + t = channel.sendCommand(service, command, toJSONSequence(args), self) + except exceptions.Exception as y: + t = Token() + protocol.invokeLater(self._error, y) + self.token = t + + def _error(self, error): + assert not self.__done + self.__done = True + self.done(error, None) + + def progress(self, token, data): + assert self.token is token + + def result(self, token, data): + assert self.token is token + error = None + args = None + try: + args = fromJSONSequence(data) + except exceptions.Exception as e: + error = e + assert not self.__done + self.__done = True + self.done(error, args) + + def terminated(self, token, error): + assert self.token is token + assert not self.__done + self.__done = True + self.done(error, None) + + def done(self, error, args): + raise exceptions.NotImplementedError("Abstract method") + + def getCommandString(self): + buf = cStringIO.StringIO() + buf.write(self.service) + buf.write(" ") + buf.write(self.command) + if self.args is not None: + i = 0 + for arg in self.args: + if i == 0: + buf.write(" ") + else: + buf.write(", ") + i += 1 + try: + json.dump(arg, buf) + except exceptions.Exception as x: + buf.write("***") + buf.write(x.message) + buf.write("***") + return buf.getvalue() + + def toError(self, data, include_command_text=True): + if not data: return None + map = data + bf = cStringIO.StringIO() + bf.write("TCF error report:\n") + if include_command_text: + cmd = self.getCommandString() + if len(cmd) > 120: cmd = cmd[:120] + "..." + bf.write("Command: ") + bf.write(cmd) + errors.appendErrorProps(bf, map) + return errors.ErrorReport(bf.getvalue(), map) diff --git a/python/src/tcf/channel/StreamChannel.py b/python/src/tcf/channel/StreamChannel.py new file mode 100644 index 000000000..1b409c7a3 --- /dev/null +++ b/python/src/tcf/channel/StreamChannel.py @@ -0,0 +1,130 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import types
+from AbstractChannel import AbstractChannel, EOS, EOM
+
+ESC = 3
+
+class StreamChannel(AbstractChannel):
+ """
+ Abstract channel implementation for stream oriented transport protocols.
+
+ StreamChannel implements communication link connecting two end points (peers).
+ The channel asynchronously transmits messages: commands, results and events.
+
+ StreamChannel uses escape sequences to represent End-Of-Message and End-Of-Stream markers.
+
+ Clients can subclass StreamChannel to support particular stream oriented transport (wire) protocol.
+ Also, see ChannelTCP for a concrete IChannel implementation that works on top of TCP sockets as a transport.
+ """
+
+ def __init__(self, remote_peer, local_peer=None):
+ super(StreamChannel, self).__init__(remote_peer, local_peer=local_peer)
+ self.bin_data_size = 0
+ self.buf = bytearray(0x1000)
+ self.buf_pos = 0
+ self.buf_len = 0
+
+ def get(self):
+ pass
+ def put(self, n):
+ pass
+
+ def getBuf(self, buf):
+ i = 0
+ l = len(buf)
+ while i < l:
+ b = self.get()
+ if b < 0:
+ if i == 0: return -1
+ break
+ buf[i] = b
+ i += 1
+ if i >= self.bin_data_size: break
+ return i
+
+ def putBuf(self, buf):
+ for b in buf: self.put(b & 0xff)
+
+ def read(self):
+ while True:
+ while self.buf_pos >= self.buf_len:
+ self.buf_len = self.getBuf(self.buf)
+ self.buf_pos = 0
+ if self.buf_len < 0: return EOS
+ res = self.buf[self.buf_pos] & 0xff
+ self.buf_pos += 1
+ if self.bin_data_size > 0:
+ self.bin_data_size -= 1
+ return res
+ if res != ESC: return res
+ while self.buf_pos >= self.buf_len:
+ self.buf_len = self.getBuf(self.buf)
+ self.buf_pos = 0
+ if self.buf_len < 0: return EOS
+ n = self.buf[self.buf_pos] & 0xff
+ self.buf_pos += 1
+ if n == 0: return ESC
+ elif n == 1: return EOM
+ elif n == 2: return EOS
+ elif n == 3:
+ for i in xrange(0, 100000, 7):
+ while self.buf_pos >= self.buf_len:
+ self.buf_len = self.getBuf(self.buf)
+ self.buf_pos = 0
+ if self.buf_len < 0: return EOS
+ m = self.buf[self.buf_pos] & 0xff
+ self.buf_pos += 1
+ self.bin_data_size |= (m & 0x7f) << i
+ if (m & 0x80) == 0: break
+ else:
+ if n < 0: return EOS
+ assert False
+
+ def writeByte(self, n):
+ if n == ESC:
+ self.put(ESC)
+ self.put(0)
+ elif n == EOM:
+ self.put(ESC)
+ self.put(1)
+ elif n == EOS:
+ self.put(ESC)
+ self.put(2)
+ else:
+ assert n >= 0 and n <= 0xff
+ self.put(n)
+
+ def write(self, buf):
+ t = type(buf)
+ if t == types.IntType:
+ self.writeByte(buf)
+ return
+ elif t == types.StringType:
+ buf = bytearray(buf)
+ if len(buf) > 32 and self.isZeroCopySupported():
+ self.put(ESC)
+ self.put(3)
+ n = len(buf)
+ while True:
+ if n <= 0x7f:
+ self.put(n)
+ break
+ self.put((n & 0x7f) | 0x80)
+ n >>= 7
+ self.putBuf(buf)
+ else:
+ for b in buf:
+ n = b & 0xff
+ self.put(n)
+ if n == ESC:
+ self.put(0)
diff --git a/python/src/tcf/channel/__init__.py b/python/src/tcf/channel/__init__.py new file mode 100644 index 000000000..1a96a3b7d --- /dev/null +++ b/python/src/tcf/channel/__init__.py @@ -0,0 +1,145 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import cStringIO, json, binascii, types, exceptions
+
+# channel states
+STATE_OPENING = 0
+STATE_OPEN = 1
+STATE_CLOSED = 2
+
+class TraceListener(object):
+ def onMessageReceived(self, type, token, service, name, data):
+ pass
+
+ def onMessageSent(self, type, token, service, name, data):
+ pass
+
+ def onChannelClosed(self, error):
+ pass
+
+def Proxy(object):
+ def onCommand(self, token, service, name, data):
+ pass
+
+ def onEvent(self, service, name, data):
+ pass
+
+ def onChannelClosed(self, error):
+ pass
+
+_token_cnt = 0
+class Token(object):
+ def __init__(self, id=None, listener=None):
+ if id is None:
+ global _token_cnt
+ id = str(_token_cnt)
+ _token_cnt += 1
+ else:
+ id = str(id)
+ self.id = id
+ self.listener = listener
+ def getID(self):
+ return self.id
+ def getListener(self):
+ return self.listener
+ def cancel(self):
+ return False
+
+class ChannelListener(object):
+ """
+ Channel listener interface.
+ """
+
+ def onChannelOpened(self):
+ """
+ Called when a channel is opened or redirected.
+ """
+ pass
+
+ def onChannelClosed(self, error):
+ """
+ Called when channel closed. If it is closed because of an error,
+ 'error' parameter will describe the error. 'error' is null if channel
+ is closed normally by calling Channel.close().
+ @param error - channel exception or null
+ """
+ pass
+
+ def congestionLevel(self, level):
+ """
+ Notifies listeners about channel out-bound traffic congestion level changes.
+ When level > 0 client should delay sending more messages.
+ @param level - current congestion level
+ """
+ pass
+
+class EventListener(object):
+ """
+ A generic interface for service event listener.
+ Services usually define a service specific event listener interface,
+ which is implemented using this generic listener.
+ Clients should user service specific listener interface,
+ unless no such interface is defined.
+ """
+ svc_name = "<unknown>"
+ def event(self, name, data):
+ """
+ Called when service event message is received
+ @param name - event name
+ @param data - event arguments encoded as bytearray
+ """
+ pass
+
+class CommandServer(object):
+ """
+ Command server interface.
+ This interface is to be implemented by service providers.
+ """
+ def command(self, token, name, data):
+ """
+ Called every time a command is received from remote peer.
+ @param token - command handle
+ @param name - command name
+ @param data - command arguments encoded into array of bytes
+ """
+ pass
+
+def toJSONSequence(args):
+ if args is None:
+ return None
+ buf = cStringIO.StringIO()
+ for arg in args:
+ json.dump(arg, buf, separators=(',', ':'))
+ buf.write('\0')
+ return buf.getvalue()
+
+def fromJSONSequence(bytes):
+ if bytes[-1] == 0:
+ del bytes[-1]
+ str = bytes.decode("UTF-8")
+ parts = str.split('\0')
+ objects = []
+ for part in parts:
+ if part:
+ objects.append(json.loads(part))
+ else:
+ objects.append(None)
+ return objects
+
+_ByteArrayType = type(bytearray())
+def toByteArray(data):
+ if data is None: return None
+ t = type(data)
+ if t is _ByteArrayType: return data
+ if t is types.StringType:
+ return binascii.a2b_base64(data)
+ raise exceptions.Exception()
diff --git a/python/src/tcf/errors.py b/python/src/tcf/errors.py new file mode 100644 index 000000000..7a6c69364 --- /dev/null +++ b/python/src/tcf/errors.py @@ -0,0 +1,161 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http:#www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions, cStringIO, time, types
+
+# Error report attribute names
+ERROR_CODE = "Code" # integer
+ERROR_TIME = "Time" # integer
+ERROR_SERVICE = "Service" # string
+ERROR_FORMAT = "Format" # string
+ERROR_PARAMS = "Params" # array
+ERROR_SEVERITY = "Severity" # integer
+ERROR_ALT_CODE = "AltCode" # integer
+ERROR_ALT_ORG = "AltOrg" # string
+ERROR_CAUSED_BY = "CausedBy" # object
+
+# Error severity codes
+SEVERITY_ERROR = 0
+SEVERITY_WARNING = 1
+SEVERITY_FATAL = 2
+
+# Error code ranges
+# Standard TCF code range */
+CODE_STD_MIN = 0
+CODE_STD_MAX = 0xffff
+
+# Service specific codes. Decoding requires service ID. */
+CODE_SERVICE_SPECIFIC_MIN = 0x10000
+CODE_SERVICE_SPECIFIC_MAX = 0x1ffff
+
+# Reserved codes - will never be used by the TCF standard */
+CODE_RESERVED_MIN = 0x20000
+CODE_RESERVED_MAX = 0x2ffff
+
+# Standard TCF error codes
+TCF_ERROR_OTHER = 1
+TCF_ERROR_JSON_SYNTAX = 2
+TCF_ERROR_PROTOCOL = 3
+TCF_ERROR_BUFFER_OVERFLOW = 4
+TCF_ERROR_CHANNEL_CLOSED = 5
+TCF_ERROR_COMMAND_CANCELLED = 6
+TCF_ERROR_UNKNOWN_PEER = 7
+TCF_ERROR_BASE64 = 8
+TCF_ERROR_EOF = 9
+TCF_ERROR_ALREADY_STOPPED = 10
+TCF_ERROR_ALREADY_EXITED = 11
+TCF_ERROR_ALREADY_RUNNING = 12
+TCF_ERROR_ALREADY_ATTACHED = 13
+TCF_ERROR_IS_RUNNING = 14
+TCF_ERROR_INV_DATA_SIZE = 15
+TCF_ERROR_INV_CONTEXT = 16
+TCF_ERROR_INV_ADDRESS = 17
+TCF_ERROR_INV_EXPRESSION = 18
+TCF_ERROR_INV_FORMAT = 19
+TCF_ERROR_INV_NUMBER = 20
+TCF_ERROR_INV_DWARF = 21
+TCF_ERROR_SYM_NOT_FOUND = 22
+TCF_ERROR_UNSUPPORTED = 23
+TCF_ERROR_INV_DATA_TYPE = 24
+TCF_ERROR_INV_COMMAND = 25
+TCF_ERROR_INV_TRANSPORT = 26
+TCF_ERROR_CACHE_MISS = 27
+TCF_ERROR_NOT_ACTIVE = 28
+
+_timestamp_format = "%Y-%m-%d %H:%M:%S"
+
+class ErrorReport(exceptions.Exception):
+ def __init__(self, msg, attrs):
+ super(ErrorReport, self).__init__(msg)
+ if type(attrs) is types.IntType:
+ attrs = {
+ ERROR_CODE : attrs,
+ ERROR_TIME : time.time(),
+ ERROR_FORMAT : msg,
+ ERROR_SEVERITY : SEVERITY_ERROR
+ }
+ self.attrs = attrs
+ caused_by = attrs.get(ERROR_CAUSED_BY)
+ if caused_by:
+ map = caused_by
+ bf = cStringIO.StringIO()
+ bf.write("TCF error report:")
+ bf.write('\n')
+ appendErrorProps(bf, map)
+ self.caused_by = ErrorReport(bf.getvalue(), map)
+
+ def getErrorCode(self):
+ return self.attrs.get(ERROR_CODE) or 0
+
+ def getAltCode(self):
+ return self.attrs.get(ERROR_ALT_CODE) or 0
+
+ def getAltOrg(self):
+ return self.attrs.get(ERROR_ALT_ORG)
+
+ def getAttributes(self):
+ return self.attrs
+
+
+def toErrorString(data):
+ if not data: return None
+ map = data
+ fmt = map.get(ERROR_FORMAT)
+ if fmt:
+ c = map.get(ERROR_PARAMS)
+ if c: return fmt.format(c)
+ return fmt
+ code = map.get(ERROR_CODE)
+ if code is not None:
+ if code == TCF_ERROR_OTHER:
+ alt_org = map.get(ERROR_ALT_ORG)
+ alt_code = map.get(ERROR_ALT_CODE)
+ if alt_org and alt_code:
+ return "%s Error %d" % (alt_org, alt_code)
+ return "TCF Error %d" % code
+ return "Invalid error report format"
+
+def appendErrorProps(bf, map):
+ timeVal = map.get(ERROR_TIME)
+ code = map.get(ERROR_CODE)
+ service = map.get(ERROR_SERVICE)
+ severity = map.get(ERROR_SEVERITY)
+ alt_code = map.get(ERROR_ALT_CODE)
+ alt_org = map.get(ERROR_ALT_ORG)
+ if timeVal:
+ bf.write('\n')
+ bf.write("Time: ")
+ bf.write(time.strftime(_timestamp_format, time.localtime(timeVal/1000.)))
+ if severity:
+ bf.write('\n')
+ bf.write("Severity: ")
+ if severity == SEVERITY_ERROR: bf.write("Error")
+ elif severity == SEVERITY_FATAL: bf.write("Fatal")
+ elif severity == SEVERITY_WARNING: bf.write("Warning")
+ else: bf.write("Unknown")
+ bf.write('\n')
+ bf.write("Error text: ")
+ bf.write(toErrorString(map))
+ bf.write('\n')
+ bf.write("Error code: ")
+ bf.write(str(code))
+ if service:
+ bf.write('\n')
+ bf.write("Service: ")
+ bf.write(service)
+ if alt_code:
+ bf.write('\n')
+ bf.write("Alt code: ")
+ bf.write(str(alt_code))
+ if alt_org:
+ bf.write('\n')
+ bf.write("Alt org: ")
+ bf.write(alt_org)
diff --git a/python/src/tcf/peer.py b/python/src/tcf/peer.py new file mode 100644 index 000000000..37e076123 --- /dev/null +++ b/python/src/tcf/peer.py @@ -0,0 +1,276 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+"""
+Both hosts and targets are represented by objects
+implementing IPeer interface. A peer can act as host or
+target depending on services it implements.
+List of currently known peers can be retrieved by
+calling Locator.getPeers()
+
+A TCF agent houses one or more service managers. A service manager has a one or more
+services to expose to the world. The service manager creates one or more peers
+to represent itself, one for every access path the agent is
+reachable by. For example, in agents accessible via TCP/IP, the
+service manger would create a peer for every subnet it wants to participate in.
+All peers of particular service manager represent identical sets of services.
+"""
+
+import os, exceptions, time, json
+from tcf import protocol, transport, services
+from tcf.services import locator
+
+# Peer unique ID
+ATTR_ID = "ID"
+
+# Unique ID of service manager that is represented by this peer
+ATTR_SERVICE_MANAGER_ID = "ServiceManagerID"
+
+# Agent unique ID
+ATTR_AGENT_ID = "AgentID"
+
+# Peer name
+ATTR_NAME = "Name"
+
+# Name of the peer operating system
+ATTR_OS_NAME = "OSName"
+
+# Transport name, for example TCP, SSL
+ATTR_TRANSPORT_NAME = "TransportName"
+
+# If present, indicates that the peer can forward traffic to other peers
+ATTR_PROXY = "Proxy"
+
+# Host DNS name or IP address
+ATTR_IP_HOST = "Host"
+
+# Optional list of host aliases
+ATTR_IP_ALIASES = "Aliases"
+
+# Optional list of host addresses
+ATTR_IP_ADDRESSES = "Addresses"
+
+# IP port number, must be decimal number
+ATTR_IP_PORT = "Port"
+
+
+class Peer(object):
+ def __init__(self, attrs):
+ self.attrs = attrs
+ def getAttributes(self):
+ """@return map of peer attributes"""
+ return self.attrs
+
+ def getID(self):
+ """@return peer unique ID, same as getAttributes().get(ATTR_ID)"""
+ return self.attrs.get(ATTR_ID)
+
+ def getServiceManagerID(self):
+ """@return service manager unique ID, same as getAttributes().get(ATTR_SERVICE_MANAGER_ID)"""
+ assert protocol.isDispatchThread()
+ return self.attrs.get(ATTR_SERVICE_MANAGER_ID)
+
+ def getAgentID(self):
+ """@return agent unique ID, same as getAttributes().get(ATTR_AGENT_ID)"""
+ assert protocol.isDispatchThread()
+ return self.attrs.get(ATTR_AGENT_ID)
+
+ def getName(self):
+ """@return peer name, same as getAttributes().get(ATTR_NAME)"""
+ return self.attrs.get(ATTR_NAME)
+
+ def getOSName(self):
+ """@return agent OS name, same as getAttributes().get(ATTR_OS_NAME)"""
+ return self.attrs.get(ATTR_OS_NAME)
+
+ def getTransportName(self):
+ """@return transport name, same as getAttributes().get(ATTR_TRANSPORT_NAME)"""
+ return self.attrs.get(ATTR_TRANSPORT_NAME)
+
+ def openChannel(self):
+ """Open channel to communicate with this peer.
+ Note: the channel is not fully open yet when this method returns.
+ Its state is channel.STATE_OPENING.
+ Protocol.ChannelOpenListener and IChannel.IChannelListener listeners will be called when
+ the channel will change state to open or closed.
+ Clients are supposed to register IChannel.IChannelListener right after calling openChannel(), or,
+ at least, in same dispatch cycle. For example:
+ channel = peer.openChannel()
+ channel.addChannelListener(...)
+ """
+ raise exceptions.RuntimeError("Abstract method")
+
+
+class TransientPeer(Peer):
+ """
+ Transient implementation of IPeer interface.
+ Objects of this class are not tracked by Locator service.
+ See AbstractPeer for IPeer objects that should go into the Locator table.
+ """
+
+ rw_attrs = {}
+
+ def __init__(self, attrs):
+ self.rw_attrs.update(attrs)
+ # TODO readonly map
+ ro_attrs = {}
+ ro_attrs.update(self.rw_attrs)
+ super(TransientPeer, self).__init__(ro_attrs)
+
+ def openChannel(self):
+ return transport.openChannel(self)
+
+class LocalPeer(TransientPeer):
+ """
+ LocalPeer object represents local end-point of TCF communication channel.
+ There should be exactly one such object in a TCF agent.
+ The object can be used to open a loop-back communication channel that allows
+ the agent to access its own services same way as remote services.
+ Note that "local" here is relative to the agent, and not same as in "local host".
+ """
+ def __init__(self):
+ super(LocalPeer, self).__init__(self.createAttributes())
+
+ def createAttributes(self):
+ attrs = {
+ ATTR_ID : "TCFLocal",
+ ATTR_SERVICE_MANAGER_ID : services.getServiceManagerID(),
+ ATTR_AGENT_ID : protocol.getAgentID(),
+ ATTR_NAME : "Local Peer",
+ ATTR_OS_NAME : os.name,
+ ATTR_TRANSPORT_NAME : "Loop"
+ }
+ return attrs;
+
+class AbstractPeer(TransientPeer):
+ """
+ Abstract implementation of IPeer interface.
+ Objects of this class are stored in Locator service peer table.
+ The class implements sending notification events to Locator listeners.
+ See TransientPeer for IPeer objects that are not stored in the Locator table.
+ """
+
+ last_heart_beat_time = 0
+
+ def __init__(self, attrs):
+ super(AbstractPeer, self).__init__(attrs)
+ assert protocol.isDispatchThread()
+ id = self.getID()
+ assert id
+ peers = locator.getLocator().getPeers()
+ if isinstance(peers.get(id), RemotePeer):
+ peers.get(id).dispose()
+ assert not peers.has_key(id)
+ peers[id] = self
+ self.sendPeerAddedEvent()
+
+ def dispose(self):
+ assert protocol.isDispatchThread()
+ id = self.getID()
+ assert id
+ peers = locator.getLocator().getPeers()
+ assert peers.get(id) == self
+ del peers[id]
+ self.sendPeerRemovedEvent()
+
+ def onChannelTerminated(self):
+ # A channel to this peer was terminated:
+ # not delaying next heart beat helps client to recover much faster.
+ self.last_heart_beat_time = 0
+
+ def updateAttributes(self, attrs):
+ equ = True
+ assert attrs.get(ATTR_ID) == self.rw_attrs.get(ATTR_ID)
+ for key in self.rw_attrs.keys():
+ if self.rw_attrs.get(key) != attrs.get(key):
+ equ = False
+ break
+ for key in attrs.keys():
+ if attrs.get(key) != self.rw_attrs.get(key):
+ equ = False
+ break
+ timeVal = int(time.time())
+ if not equ:
+ self.rw_attrs.clear()
+ self.rw_attrs.update(attrs)
+ for l in locator.getListeners():
+ try:
+ l.peerChanged(self)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ try:
+ args = [self.rw_attrs]
+ protocol.sendEvent(locator.NAME, "peerChanged", json.dumps(args))
+ except exceptions.IOError as x:
+ protocol.log("Locator: failed to send 'peerChanged' event", x)
+ self.last_heart_beat_time = timeVal
+ elif self.last_heart_beat_time + locator.DATA_RETENTION_PERIOD / 4 < timeVal:
+ for l in locator.getListeners():
+ try:
+ l.peerHeartBeat(attrs.get(ATTR_ID))
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ try:
+ args = [self.rw_attrs.get(ATTR_ID)]
+ protocol.sendEvent(locator.NAME, "peerHeartBeat", json.dumps(args))
+ except exceptions.IOError as x:
+ protocol.log("Locator: failed to send 'peerHeartBeat' event", x)
+ self.last_heart_beat_time = timeVal
+
+ def sendPeerAddedEvent(self):
+ for l in locator.getListeners():
+ try:
+ l.peerAdded(self)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ try:
+ args = [self.rw_attrs]
+ protocol.sendEvent(locator.NAME, "peerAdded", json.dumps(args))
+ except exceptions.IOError as x:
+ protocol.log("Locator: failed to send 'peerAdded' event", x)
+ self.last_heart_beat_time = int(time.time())
+
+ def sendPeerRemovedEvent(self):
+ for l in locator.getListeners():
+ try:
+ l.peerRemoved(self.rw_attrs.get(ATTR_ID))
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ try:
+ args = [self.rw_attrs.get(ATTR_ID)]
+ protocol.sendEvent(locator.NAME, "peerRemoved", json.dumps(args))
+ except exceptions.IOError as x:
+ protocol.log("Locator: failed to send 'peerRemoved' event", x)
+
+
+class RemotePeer(AbstractPeer):
+ """
+ RemotePeer objects represent TCF agents that Locator service discovered on local network.
+ This includes both local host agents and remote host agents.
+ Note that "remote peer" means any peer accessible over network,
+ it does not imply the agent is running on a "remote host".
+ If an agent binds multiple network interfaces or multiple ports, it can be represented by
+ multiple RemotePeer objects - one per each network address/port combination.
+ RemotePeer objects life cycle is managed by Locator service.
+ """
+
+ last_update_time = 0
+
+ def __init__(self, attrs):
+ super(RemotePeer, self).__init__(attrs)
+ self.last_update_time = int(time.time())
+
+ def updateAttributes(self, attrs):
+ super(RemotePeer, self).updateAttributes(attrs)
+ self.last_update_time = int(time.time())
+
+ def getLastUpdateTime(self):
+ return self.last_update_time
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py new file mode 100644 index 000000000..2a00c31ff --- /dev/null +++ b/python/src/tcf/protocol.py @@ -0,0 +1,312 @@ +# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +""" +Module protocol provides static methods to access Target Communication Framework root objects: +1. the framework event queue and dispatch thread +2. local instance of Locator service, which maintains a list of available targets +3. list of open communication channels. + +It also provides utility methods for posting asynchronous events, +including delayed events (timers). +""" + +import sys, uuid, threading, exceptions +from EventQueue import EventQueue + +_event_queue = None +def startEventQueue(): + global _event_queue + if _event_queue and not _event_queue.isShutdown(): return + _event_queue = EventQueue() + _event_queue.start() + +def getEventQueue(): + """ + @return instance of event queue that is used for TCF events. + """ + return _event_queue + +def isDispatchThread(): + """ + Returns true if the calling thread is the TCF event dispatch thread. + Use this call to ensure that a given task is being executed (or not being) + on dispatch thread. + + @return true if running on the dispatch thread. + """ + return _event_queue is not None and _event_queue.isDispatchThread() + +def invokeLater(callable, *args, **kwargs): + """ + Causes callable to be called with given arguments + in the dispatch thread of the framework. + Events are dispatched in same order as queued. + If invokeLater is called from the dispatching thread + the callable will still be deferred until + all pending events have been processed. + + This method can be invoked from any thread. + + @param callable the callable to be executed asynchronously + """ + _event_queue.invokeLater(callable, *args, **kwargs) + +def invokeLaterWithDelay(delay, callable, *args, **kwargs): + """ + Causes callable event to called in the dispatch thread of the framework. + The event is dispatched after given delay. + + This method can be invoked from any thread. + + @param delay milliseconds to delay event dispatch. + If delay <= 0 the event is posted into the + "ready" queue without delay. + @param callable the callable to be executed asynchronously. + """ + if delay <= 0: + _event_queue.invokeLater(callable, *args, **kwargs) + else: + # TODO timer_queue + raise exceptions.NotImplementedError("Implement invokeLaterWithDelay") +# synchronized (timer_queue) { +# timer_queue.add(new Timer(System.currentTimeMillis() + delay, runnable)) +# timer_queue.notify() + +def invokeAndWait(callable, *args, **kwargs): + """ + Causes callable to be called in the dispatch thread of the framework. + Calling thread is suspended until the method is executed. + If invokeAndWait is called from the dispatching thread + the callable is executed immediately. + + This method can be invoked from any thread. + + @param runnable the callable to be executed on dispatch thread. + """ + if _event_queue.isDispatchThread(): + return callable(*args, **kwargs) + else: + class DoRun(): + result = None + def __call__(self): + try: + self.result = callable(*args, **kwargs) + finally: + with runLock: + runLock.notify() + doRun = DoRun() + runLock = threading.Condition() + with runLock: + _event_queue.invokeLater(doRun) + runLock.wait() + return doRun.result + +_agentID = str(uuid.uuid4()) +def getAgentID(): + return _agentID + +_logger = None +def setLogger(logger): + """ + Set framework logger. + By default sys.stderr is used. + + @param logger - an object implementing Logger interface. + """ + global _logger + _logger = logger + +def log(msg, x=None): + """ + Logs the given message. + @see #setLogger + This method can be invoked from any thread. + @param msg - log entry text + @param x - an exception associated with the log entry or null. + """ + if not _logger: + print>>sys.stderr, msg + while x: + import traceback + print>>sys.stderr, "%s: %s" % (type(x).__name__, x) + tb = getattr(x, "tb", None) or sys.exc_info()[2] + if tb: + traceback.print_tb(tb) + caused_by = getattr(x, "caused_by", None) + if caused_by: + print>>sys.stderr, "Caused by:" + x = caused_by + else: + break + else: + _logger.log(msg, x) + +def getLocator(): + from services import locator + return locator.getLocator() + +def getOpenChannels(): + """ + Return an array of all open channels. + @return an array of IChannel + """ + assert isDispatchThread() + import transport + return transport.getOpenChannels() + +class ChannelOpenListener(object): + """ + Interface to be implemented by clients willing to be notified when + new TCF communication channel is opened. + + The interface allows a client to get pointers to channel objects + that were opened by somebody else. If a client open a channel itself, it already has + the pointer and does not need Protocol.ChannelOpenListener. If a channel is created, + for example, by remote peer connecting to the client, the only way to get the pointer + is Protocol.ChannelOpenListener. + """ + def onChannelOpen(self, channel): + pass + +def addChannelOpenListener(listener): + """ + Add a listener that will be notified when new channel is opened. + @param listener + """ + assert isDispatchThread() + import transport + transport.addChannelOpenListener(listener) + +def removeChannelOpenListener(listener): + """ + Remove channel opening listener. + @param listener + """ + assert isDispatchThread() + import transport + transport.removeChannelOpenListener(listener) + +def sendEvent(service_name, event_name, data): + """ + Transmit TCF event message. + The message is sent to all open communication channels - broadcasted. + """ + assert isDispatchThread() + import transport + transport.sendEvent(service_name, event_name, data) + +def sync(done): + """ + Call back after all TCF messages sent by this host up to this moment are delivered + to their intended target. This method is intended for synchronization of messages + across multiple channels. + + Note: Cross channel synchronization can reduce performance and throughput. + Most clients don't need cross channel synchronization and should not call this method. + + @param done will be executed by dispatch thread after pending communication + messages are delivered to corresponding targets. + """ + assert isDispatchThread() + import transport + transport.sync(done) + +class CongestionMonitor(object): + """ + Clients implement CongestionMonitor interface to monitor usage of local resources, + like, for example, display queue size - if the queue becomes too big, UI response time + can become too high, or it can crash all together because of OutOfMemory errors. + TCF flow control logic prevents such conditions by throttling traffic coming from remote peers. + Note: Local (in-bound traffic) congestion is detected by framework and reported to + remote peer without client needed to be involved. Only clients willing to provide + additional data about local congestion should implement CongestionMonitor and + register it using Protocol.addCongestionMonitor(). + """ + def getCongestionLevel(self): + """ + Get current level of client resource utilization. + @return integer value in range -100..100, where -100 means all resources are free, + 0 means optimal load, and positive numbers indicate level of congestion. + """ + raise exceptions.NotImplementedError("Abstract method") + +_congestion_monitors = [] +def addCongestionMonitor(monitor): + """ + Register a congestion monitor. + @param monitor - client implementation of CongestionMonitor interface + """ + assert isDispatchThread() + _congestion_monitors.add(monitor) + +def removeCongestionMonitor(monitor): + """ + Unregister a congestion monitor. + @param monitor - client implementation of CongestionMonitor interface + """ + assert isDispatchThread() + _congestion_monitors.remove(monitor) + +def getCongestionLevel(): + """ + Get current level of local traffic congestion. + + @return integer value in range -100..100, where -100 means no pending + messages (no traffic), 0 means optimal load, and positive numbers + indicate level of congestion. + """ + assert isDispatchThread() + level = -100 + for m in _congestion_monitors: + n = m.getCongestionLevel() + if n > level: level = n + if _event_queue: + n = _event_queue.getCongestion() + if n > level: level = n + if level > 100: level = 100 + return level + +def addServiceProvider(provider): + """ + Register service provider. + This method can be invoked from any thread. + @param provider - ServiceProvider implementation + """ + import services + services.addServiceProvider(provider) + +def removeServiceProvider(provider): + """ + Unregister service provider. + This method can be invoked from any thread. + @param provider - ServiceProvider implementation + """ + import services + services.removeServiceProvider(provider) + +def addTransportProvider(provider): + """ + Register transport provider. + This method can be invoked from any thread. + @param provider - TransportProvider implementation + """ + import transport + transport.addTransportProvider(provider) + +def removeTransportProvider(provider): + """ + Unregister transport provider. + This method can be invoked from any thread. + @param provider - TransportProvider implementation + """ + import transport + transport.removeTransportProvider(provider) diff --git a/python/src/tcf/services/__init__.py b/python/src/tcf/services/__init__.py new file mode 100644 index 000000000..349a2b52a --- /dev/null +++ b/python/src/tcf/services/__init__.py @@ -0,0 +1,115 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import threading, exceptions
+from tcf import protocol
+
+_providers = []
+_lock = threading.RLock()
+
+class ServiceProvider(object):
+ """
+ Clients can implement this abstract class if they want to provide implementation of a local service or
+ remote service proxy.
+ """
+ def getLocalService(self, channel):
+ pass
+ def getServiceProxy(self, channel, service_name):
+ pass
+
+def addServiceProvider(provider):
+ with _lock:
+ _providers.append(provider)
+
+def removeServiceProvider(provider):
+ with _lock:
+ _providers.remove(provider)
+
+def onChannelCreated(channel, services_by_name):
+ with _lock:
+ zero_copy = ZeroCopy()
+ services_by_name[zero_copy.getName()] = zero_copy
+ for provider in _providers:
+ try:
+ arr = provider.getLocalService(channel)
+ if not arr: continue
+ for service in arr:
+ if services_by_name.has_key(service.getName()): continue
+ services_by_name[service.getName()] = service
+ except exceptions.Exception as x:
+ protocol.log("Error calling TCF service provider", x);
+
+def onChannelOpened(channel, service_names, services_by_name):
+ with _lock:
+ for name in service_names:
+ for provider in _providers:
+ try:
+ service = provider.getServiceProxy(channel, name)
+ if not service: continue
+ services_by_name[name] = service
+ break
+ except exceptions.Exception as x:
+ protocol.log("Error calling TCF service provider", x)
+ if services_by_name.has_key(name): continue
+ services_by_name[name] = GenericProxy(channel, name)
+
+def getServiceManagerID():
+ # In current implementation ServiceManager is a singleton,
+ # so its ID is same as agent ID.
+ return protocol.getAgentID()
+
+class Service(object):
+ def getName(self):
+ raise exceptions.NotImplementedError("Abstract method")
+ def __str__(self):
+ return self.getName()
+
+class ZeroCopy(Service):
+ def getName(self):
+ return "ZeroCopy"
+
+class GenericProxy(Service):
+ """
+ * Objects of GenericProxy class represent remote services, which don't
+ * have a proxy class defined for them.
+ * Clients still can use such services, but framework will not provide
+ * service specific utility methods for message formatting and parsing.
+ """
+ def __init__(self, channel, name):
+ self.__channel = channel
+ self.name = name
+ def getName(self):
+ return self.name
+ def getChannel(self):
+ return self.__channel
+
+class DefaultServiceProvider(ServiceProvider):
+ package_base = "tcf.services.remote"
+ def getLocalService(self, channel):
+ # TODO DiagnosticsService
+ #return [DiagnosticsService(channel)]
+ return []
+ def getServiceProxy(self, channel, service_name):
+ service = None
+ try:
+ clsName = service_name + "Proxy"
+ package = self.package_base + "." + clsName
+ clsModule = __import__(package, fromlist=[clsName], globals=globals())
+ cls = clsModule.__dict__.get(clsName)
+ service = cls(channel)
+ assert service_name == service.getName()
+ except exceptions.ImportError:
+ pass
+ except exceptions.Exception as x:
+ protocol.log("Cannot instantiate service proxy for "+service_name, x)
+ return service
+
+addServiceProvider(DefaultServiceProvider())
diff --git a/python/src/tcf/services/breakpoints.py b/python/src/tcf/services/breakpoints.py new file mode 100644 index 000000000..97ee9b2c8 --- /dev/null +++ b/python/src/tcf/services/breakpoints.py @@ -0,0 +1,314 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions
+from tcf import services
+
+"""
+Breakpoint is represented by unique identifier and set of properties.
+Breakpoint identifier (String id) needs to be unique across all hosts and targets.
+
+Breakpoint properties (Map<String,Object>) is extendible collection of named attributes,
+which define breakpoint location and behavior. This module defines some common
+attribute names (see PROP_*), host tools and target agents may support additional attributes.
+
+For each breakpoint a target agent maintains another extendible collection of named attributes:
+breakpoint status (Map<String,Object>, see STATUS_*). While breakpoint properties are
+persistent and represent user input, breakpoint status reflects dynamic target agent reports
+about breakpoint current state, like actual addresses where breakpoint is planted or planting errors.
+"""
+
+# Service name.
+NAME = "Breakpoints"
+
+# Breakpoint property names.
+PROP_ID = "ID" # String
+PROP_ENABLED = "Enabled" # Boolean
+PROP_TYPE = "BreakpointType" # String
+PROP_CONTEXTNAMES = "ContextNames" # Array
+PROP_CONTEXTIDS = "ContextIds" # Array
+PROP_EXECUTABLEPATHS = "ExecPaths" # Array
+PROP_LOCATION = "Location" # String
+PROP_SIZE = "Size" # Number
+PROP_ACCESSMODE = "AccessMode" # Number
+PROP_FILE = "File" # String
+PROP_LINE = "Line" # Number
+PROP_COLUMN = "Column" # Number
+PROP_PATTERN = "MaskValue" # Number
+PROP_MASK = "Mask" # Number
+PROP_STOP_GROUP = "StopGroup" # Array
+PROP_IGNORECOUNT = "IgnoreCount" # Number
+PROP_TIME = "Time" # Number
+PROP_SCALE = "TimeScale" # String
+PROP_UNITS = "TimeUnits" # String
+PROP_CONDITION = "Condition" # String
+PROP_TEMPORARY = "Temporary" # Boolean
+
+# BreakpointType values
+TYPE_SOFTWARE = "Software",
+TYPE_HARDWARE = "Hardware"
+TYPE_AUTO = "Auto"
+
+# AccessMode values
+ACCESSMODE_READ = 0x01
+ACCESSMODE_WRITE = 0x02
+ACCESSMODE_EXECUTE = 0x04
+ACCESSMODE_CHANGE = 0x08
+
+# TimeScale values
+TIMESCALE_RELATIVE = "Relative"
+TIMESCALE_ABSOLUTE = "Absolute"
+
+# TimeUnits values
+TIMEUNIT_NSECS = "Nanoseconds"
+TIMEUNIT_CYCLE_COUNT = "CycleCount"
+TIMEUNIT_INSTRUCTION_COUNT = "InstructionCount"
+
+# Breakpoint status field names.
+STATUS_INSTANCES = "Instances" # Array of Map<String,Object>
+STATUS_ERROR= "Error" # String
+STATUS_FILE = "File" # String
+STATUS_LINE = "Line" # Number
+STATUS_COLUMN = "Column" # Number
+
+# Breakpoint instance field names.
+INSTANCE_ERROR = "Error" # String
+INSTANCE_CONTEXT = "LocationContext" # String
+INSTANCE_ADDRESS = "Address" # Number
+
+# Breakpoint service capabilities.
+CAPABILITY_CONTEXT_ID = "ID" # String
+CAPABILITY_HAS_CHILDREN = "HasChildren" # Boolean
+CAPABILITY_LOCATION = "Location" # Boolean
+CAPABILITY_CONDITION = "Condition" # Boolean
+CAPABILITY_FILE_LINE = "FileLine" # Boolean
+CAPABILITY_CONTEXTIDS = "ContextIds" # Boolean
+CAPABILITY_STOP_GROUP = "StopGroup" # Boolean
+CAPABILITY_IGNORECOUNT = "IgnoreCount" # Boolean
+CAPABILITY_ACCESSMODE = "AccessMode" # Number
+
+class BreakpointsService(services.Service):
+ def getName(self):
+ return NAME
+
+ def set(self, properties, done):
+ """
+ Download breakpoints data to target agent.
+ The command is intended to be used only to initialize target breakpoints table
+ when communication channel is open. After that, host should
+ notify target about (incremental) changes in breakpoint data by sending
+ add, change and remove commands.
+
+ @param properties - array of breakpoints.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneCommand
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def add(self, properties, done):
+ """
+ Called when breakpoint is added into breakpoints table.
+ @param properties - breakpoint properties.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneCommand
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def change(self, properties, done):
+ """
+ Called when breakpoint properties are changed.
+ @param properties - breakpoint properties.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneCommand
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def enable(self, ids, done):
+ """
+ Tell target to change (only) PROP_ENABLED breakpoint property to 'true'.
+ @param ids - array of enabled breakpoint identifiers.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneCommand
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def disable(self, ids, done):
+ """
+ Tell target to change (only) PROP_ENABLED breakpoint property to 'false'.
+ @param ids - array of disabled breakpoint identifiers.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneCommand
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def remove(self, ids, done):
+ """
+ Tell target to remove breakpoints.
+ @param id - unique breakpoint identifier.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneCommand
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getIDs(self, done):
+ """
+ Upload IDs of breakpoints known to target agent.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneGetIDs
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getProperties(self, id, done):
+ """
+ Upload properties of given breakpoint from target agent breakpoint table.
+ @param id - unique breakpoint identifier.
+ @param done - command result call back object.
+ @see DoneGetProperties
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getStatus(self, id, done):
+ """
+ Upload status of given breakpoint from target agent.
+ @param id - unique breakpoint identifier.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneGetStatus
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getCapabilities(self, id, done):
+ """
+ Report breakpoint service capabilities to clients so they
+ can adjust to different implementations of the service.
+ When called with a None ("") context ID the global capabilities are returned,
+ otherwise context specific capabilities are returned. A special capability
+ property is used to indicate that all child contexts have the same
+ capabilities.
+ @param id - a context ID or None.
+ @param done - command result call back object.
+ @return - pending command handle.
+ @see DoneGetCapabilities
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def addListener(self, listener):
+ """
+ Add breakpoints service event listener.
+ @param listener - object that implements BreakpointsListener interface.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def removeListener(self, listener):
+ """
+ Remove breakpoints service event listener.
+ @param listener - object that implements BreakpointsListener interface.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+
+class DoneCommand(object):
+ "Call back interface for breakpoint service commands."
+ def doneCommand(self, token, error):
+ """
+ Called when command is done.
+ @param token - command handle.
+ @param error - error object or None.
+ """
+ pass
+
+class DoneGetIDs(object):
+ "Call back interface for 'getIDs' command."
+ def doneGetIDs(self, token, error, ids):
+ """
+ Called when 'getIDs' command is done.
+ @param token - command handle.
+ @param error - error object or None.
+ @param ids - IDs of breakpoints known to target agent.
+ """
+ pass
+
+class DoneGetProperties(object):
+ "Call back interface for 'getProperties' command."
+ def doneGetProperties(self, token, error, properties):
+ """
+ Called when 'getProperties' command is done.
+ @param token - command handle.
+ @param error - error object or None.
+ @param properties - properties of the breakpoint.
+ """
+ pass
+
+class DoneGetStatus(object):
+ "Call back interface for 'getStatus' command."
+ def doneGetStatus(self, token, error, status):
+ """
+ Called when 'getStatus' command is done.
+ @param token - command handle.
+ @param error - error object or None.
+ @param status - status of the breakpoint.
+ """
+ pass
+
+class DoneGetCapabilities(object):
+ "Call back interface for 'getCapabilities' command."
+ def doneGetCapabilities(self, token, error, capabilities):
+ """
+ Called when 'getCapabilities' command is done.
+ @param token - command handle.
+ @param error - error object or None.
+ @param capabilities - breakpoints service capabilities description.
+ """
+ pass
+
+class BreakpointsListener(object):
+ """
+ Breakpoints service events listener.
+ Note that contextAdded, contextChanged and contextRemoved events carry exactly same set
+ of breakpoint properties that was sent by a client to a target. The purpose of these events is to
+ let all clients know about breakpoints that were created by other clients.
+ """
+
+ def breakpointStatusChanged(self, id, status):
+ """
+ Called when breakpoint status changes.
+ @param id - unique breakpoint identifier.
+ @param status - breakpoint status.
+ """
+ pass
+
+ def contextAdded(self, bps):
+ """
+ Called when a new breakpoints are added.
+ @param bps - array of breakpoints.
+ """
+ pass
+
+ def contextChanged(self, bps):
+ """
+ Called when breakpoint properties change.
+ @param bps - array of breakpoints.
+ """
+ pass
+
+ def contextRemoved(self, ids):
+ """
+ Called when breakpoints are removed .
+ @param ids - array of breakpoint IDs.
+ """
+ pass
diff --git a/python/src/tcf/services/locator.py b/python/src/tcf/services/locator.py new file mode 100644 index 000000000..5c7f69c65 --- /dev/null +++ b/python/src/tcf/services/locator.py @@ -0,0 +1,119 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions
+from tcf import services, protocol, channel
+
+"""
+Locator service uses transport layer to search for peers and to collect data about
+peer's attributes and capabilities (services). Discovery mechanism depends on transport protocol
+and is part of that protocol handler. Targets, known to other hosts, can be found through
+remote instances of Locator service. Automatically discovered targets require no further
+configuration. Additional targets can be configured manually.
+
+Clients should use protocol.getLocator() to obtain local instance of locator,
+then locator.getPeers() can be used to get list of available peers (hosts and targets).
+"""
+
+# Peer data retention period in milliseconds.
+DATA_RETENTION_PERIOD = 60 * 1000;
+
+# Auto-configuration protocol version.
+CONF_VERSION = '2'
+
+# Auto-configuration command and response codes.
+CONF_REQ_INFO = 1
+CONF_PEER_INFO = 2
+CONF_REQ_SLAVES = 3
+CONF_SLAVES_INFO = 4
+
+NAME = "Locator"
+
+_locator = None
+def getLocator():
+ global _locator
+ if _locator is None:
+ _locator = LocatorService()
+ services.addServiceProvider(LocatorServiceProvider())
+ return _locator
+
+class LocatorService(services.Service):
+ def getName(self):
+ return NAME
+ def getPeers(self):
+ """
+ Get map (ID -> IPeer) of available peers (hosts and targets).
+ The method return cached (currently known to the framework) list of peers.
+ The list is updated according to event received from transport layer
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+ def redirect(self, peer, done):
+ """
+ Redirect this service channel to given peer using this service as a proxy.
+ @param peer - Peer ID or attributes map.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+ def sync(self, done):
+ """
+ Call back after TCF messages sent to this target up to this moment are delivered.
+ This method is intended for synchronization of messages
+ across multiple channels.
+
+ Note: Cross channel synchronization can reduce performance and throughput.
+ Most clients don't need channel synchronization and should not call this method.
+
+ @param done will be executed by dispatch thread after communication
+ messages are delivered to corresponding targets.
+
+ This is internal API, TCF clients should use module 'tcf.protocol'.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+ def addListener(self, listener):
+ "Add a listener for Locator service events."
+ assert listener
+ assert protocol.isDispatchThread()
+ _listeners.append(listener)
+ def removeListener(self, listener):
+ "Remove a listener for Locator service events."
+ assert protocol.isDispatchThread()
+ _listeners.remove(listener)
+
+class LocatorServiceProvider(services.ServiceProvider):
+ def getLocalService(self, _channel):
+ class LocatorCommandServer(channel.CommandServer):
+ def command(self, token, name, data):
+ _locator.command(_channel, token, name, data)
+ _channel.addCommandServer(_locator, LocatorCommandServer())
+ return [_locator]
+ def getServiceProxy(self, channel, service_name):
+ return None
+
+class DoneRedirect(object):
+ def doneRedirect(self, token, error):
+ pass
+
+class DoneSync(object):
+ def doneSync(self, token):
+ pass
+
+class LocatorListener(object):
+ def peerAdded(self, peer):
+ pass
+ def peerChanged(self, peer):
+ pass
+ def peerRemoved(self, id):
+ pass
+ def peerHeartBeat(self, id):
+ pass
+
+_listeners = []
+def getListeners():
+ return _listeners
diff --git a/python/src/tcf/services/remote/BreakpointsProxy.py b/python/src/tcf/services/remote/BreakpointsProxy.py new file mode 100644 index 000000000..3458e52bd --- /dev/null +++ b/python/src/tcf/services/remote/BreakpointsProxy.py @@ -0,0 +1,140 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions
+from tcf import channel
+from tcf.services import breakpoints
+from tcf.channel.Command import Command
+
+class BPCommand(Command):
+ def __init__(self, service, cmd, cb, *args):
+ super(BPCommand, self).__init__(service.channel, service, cmd, args)
+ self.__cb = cb
+ def done(self, error, args):
+ if not error:
+ assert len(args) == 1
+ error = self.toError(args[0])
+ self.__cb.doneCommand(self.token, error)
+
+
+class ChannelEventListener(channel.EventListener):
+ def __init__(self, service, listener):
+ self.service = service
+ self.listener = listener
+ def event(self, name, data):
+ try:
+ args = channel.fromJSONSequence(data)
+ if name == "status":
+ assert len(args) == 2
+ self.listener.breakpointStatusChanged(args[0], args[1])
+ elif name == "contextAdded":
+ assert len(args) == 1
+ self.listener.contextAdded(args[0])
+ elif name == "contextChanged":
+ assert len(args) == 1
+ self.listener.contextChanged(args[0])
+ elif name == "contextRemoved":
+ assert len(args) == 1
+ self.listener.contextRemoved(args[0])
+ else:
+ raise IOError("Breakpoints service: unknown event: " + name);
+ except exceptions.Exception as x:
+ self.service.channel.terminate(x)
+
+class BreakpointsProxy(breakpoints.BreakpointsService):
+ def __init__(self, channel):
+ self.channel = channel
+ self.listeners = {}
+
+ def set(self, properties, done):
+ return BPCommand(self, "set", done, properties).token
+
+ def add(self, properties, done):
+ return BPCommand(self, "add", done, properties).token
+
+ def change(self, properties, done):
+ return BPCommand(self, "change", done, properties).token
+
+ def disable(self, ids, done):
+ return BPCommand(self, "disable", done, ids).token
+
+ def enable(self, ids, done):
+ return BPCommand(self, "enable", done, ids).token
+
+ def remove(self, ids, done):
+ return BPCommand(self, "remove", done, ids).token
+
+ def getIDs(self, done):
+ service = self
+ class GetIDsCommand(Command):
+ def __init__(self):
+ super(GetIDsCommand, self).__init__(service.channel, service, "getIDs", None)
+ def done(self, error, args):
+ ids = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ ids = args[1]
+ done.doneGetIDs(self.token, error, ids)
+ return GetIDsCommand().token
+
+ def getProperties(self, id, done):
+ service = self
+ class GetPropertiesCommand(Command):
+ def __init__(self):
+ super(GetPropertiesCommand, self).__init__(service.channel, service, "getProperties", (id,))
+ def done(self, error, args):
+ map = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ map = args[1]
+ done.doneGetProperties(self.token, error, map)
+ return GetPropertiesCommand().token
+
+ def getStatus(self, id, done):
+ service = self
+ class GetStatusCommand(Command):
+ def __init__(self):
+ super(GetStatusCommand, self).__init__(service.channel, service, "getStatus", (id,))
+ def done(self, error, args):
+ map = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ map = args[1]
+ done.doneGetStatus(self.token, error, map)
+ return GetStatusCommand().token
+
+ def getCapabilities(self, id, done):
+ service = self
+ class GetCapabilitiesCommand(Command):
+ def __init__(self):
+ super(GetCapabilitiesCommand, self).__init__(service.channel, service, "getCapabilities", (id,))
+ def done(self, error, args):
+ map = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ map = args[1]
+ done.doneGetCapabilities(self.token, error, map)
+ return GetCapabilitiesCommand().token
+
+ def addListener(self, listener):
+ l = ChannelEventListener(self, listener)
+ self.channel.addEventListener(self, l)
+ self.listeners[listener] = l
+
+ def removeListener(self, listener):
+ l = self.listeners.get(listener)
+ if l:
+ del self.listeners[listener]
+ self.channel.removeEventListener(self, l)
diff --git a/python/src/tcf/services/remote/LocatorProxy.py b/python/src/tcf/services/remote/LocatorProxy.py new file mode 100644 index 000000000..f95303bb5 --- /dev/null +++ b/python/src/tcf/services/remote/LocatorProxy.py @@ -0,0 +1,147 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions
+from tcf import protocol, locator, peer, channel
+from tcf.channel.Command import Command
+
+class Peer(peer.TransientPeer):
+ def __init__(self, parent, attrs):
+ super(Peer, self).__init__(attrs)
+ self.parent = parent
+ def openChannel(self):
+ assert protocol.isDispatchThread()
+ c = self.parent.openChannel()
+ c.redirect(self.getID())
+ return c
+
+class ChannelEventListener(channel.EventListener):
+ def __init__(self, proxy):
+ self.proxy = proxy
+ self.channel = proxy.channel
+ def event(self, name, data):
+ try:
+ args = channel.fromJSONSequence(data)
+ if name == "peerAdded":
+ assert len(args) == 1
+ peer = Peer(self.channel.getRemotePeer(), args[0])
+ if self.proxy.peers.get(peer.getID()):
+ protocol.log("Invalid peerAdded event", exceptions.Exception())
+ return
+ self.proxy.peers[peer.getID()] = peer
+ for l in self.proxy.listeners:
+ try:
+ l.peerAdded(peer)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ elif name == "peerChanged":
+ assert len(args) == 1
+ m = args[0]
+ if not m: raise exceptions.Exception("Locator service: invalid peerChanged event - no peer ID")
+ peer = self.proxy.peers.get(m.get(peer.ATTR_ID))
+ if not peer: return
+ self.proxy.peers[peer.getID()] = peer
+ for l in self.proxy.listeners:
+ try:
+ l.peerChanged(peer)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ elif name == "peerRemoved":
+ assert len(args) == 1
+ id = args[0]
+ peer = self.proxy.peers.get(id)
+ if not peer: return
+ del self.proxy.peers[id]
+ for l in self.proxy.listeners:
+ try:
+ l.peerRemoved(id)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ elif name == "peerHeartBeat":
+ assert len(args) == 1
+ id = args[0]
+ peer = self.proxy.peers.get(id)
+ if not peer: return
+ for l in self.proxy.listeners:
+ try:
+ l.peerHeartBeat(id)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ else:
+ raise exceptions.IOError("Locator service: unknown event: " + name)
+ except exceptions.Exception as x:
+ self.channel.terminate(x)
+
+class LocatorProxy(locator.LocatorService):
+ peers = {}
+ listeners = []
+ get_peers_done = False
+
+ def __init__(self, channel):
+ self.channel = channel;
+ self.event_listener = ChannelEventListener(self)
+ channel.addEventListener(self, self.event_listener)
+
+ def getPeers(self):
+ return self.peers
+
+ def redirect(self, peer, done):
+ service = self
+ class RedirectCommand(Command):
+ def __init__(self):
+ super(RedirectCommand, self).__init__(service.channel, service, "redirect", [peer])
+ def done(self, error, args):
+ if not error:
+ assert len(args) == 1
+ error = self.toError(args[0])
+ done.doneRedirect(self.token, error)
+ return RedirectCommand().token
+
+ def sync(self, done):
+ service = self
+ class SyncCommand(Command):
+ def __init__(self):
+ super(SyncCommand, self).__init__(service.channel, service, "sync", None)
+ def done(self, error, args):
+ if error: service.channel.terminate(error)
+ done.doneSync(self.token)
+ return SyncCommand().token
+
+ def addListener(self, listener):
+ self.listeners.add(listener)
+ if not self.get_peers_done:
+ service = self
+ class GetPeersCommand(Command):
+ def __init__(self):
+ super(GetPeersCommand, self).__init__(service.channel, service, "getPeers", None)
+ def done(self, error, args):
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ if error:
+ protocol.log("Locator error", error)
+ return
+ c = args[1]
+ if c:
+ for m in c:
+ id = m.get(peer.ATTR_ID)
+ if service.peers.get(id): continue;
+ peer = Peer(service.channel.getRemotePeer(), m)
+ service.peers[id] = peer
+ for l in service.listeners:
+ try:
+ l.peerAdded(peer)
+ except exceptions.Exception as x:
+ protocol.log("Unhandled exception in Locator listener", x)
+ self.get_peers_done = True
+
+ def removeListener(self, listener):
+ self.listeners.remove(listener)
diff --git a/python/src/tcf/services/remote/RunControlProxy.py b/python/src/tcf/services/remote/RunControlProxy.py new file mode 100644 index 000000000..6b1103542 --- /dev/null +++ b/python/src/tcf/services/remote/RunControlProxy.py @@ -0,0 +1,152 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions
+from tcf import channel
+from tcf.services import runcontrol
+from tcf.channel.Command import Command
+
+class RunContext(runcontrol.RunControlContext):
+ def __init__(self, service, props):
+ super(RunContext, self).__init__(props)
+ self.service = service
+
+ def getState(self, done):
+ service = self.service
+ id = self.getID()
+ class GetStateCommand(Command):
+ def __init__(self):
+ super(GetStateCommand, self).__init__(service.channel, service, "getState", (id,))
+ def done(self, error, args):
+ susp = False
+ pc = None
+ reason = None
+ map = None
+ if not error:
+ assert len(args) == 5
+ error = self.toError(args[0])
+ susp = args[1]
+ if args[2]: pc = str(args[2])
+ reason = args[3]
+ map = args[4]
+ done.doneGetState(self.token, error, susp, pc, reason, map)
+ return GetStateCommand().token
+
+# def resume(self, mode, count, done):
+# return self._command("resume", [self.getID(), mode, count], done)
+
+ def resume(self, mode, count, params, done):
+ if not params:
+ return self._command("resume", (self.getID(), mode, count), done)
+ else:
+ return self._command("resume", (self.getID(), mode, count, params), done)
+
+ def suspend(self, done):
+ return self._command("suspend", (self.getID(),), done)
+
+ def terminate(self, done):
+ return self._command("terminate", (self.getID(),), done)
+
+ def _command(self, cmd, args, done):
+ service = self.service
+ class RCCommand(Command):
+ def __init__(self, cmd, args):
+ super(RCCommand, self).__init__(service.channel, service, cmd, args)
+ def done(self, error, args):
+ if not error:
+ assert len(args) == 1
+ error = self.toError(args[0])
+ done.doneCommand(self.token, error)
+ return RCCommand(cmd, args).token
+
+ def __str__(self):
+ return "[Run Control Context %s]" % str(self._props)
+
+
+class ChannelEventListener(channel.EventListener):
+ def __init__(self, service, listener):
+ self.service = service
+ self.listener = listener
+ def event(self, name, data):
+ try:
+ args = channel.fromJSONSequence(data)
+ if name == "contextSuspended":
+ assert len(args) == 4
+ self.listener.contextSuspended(args[0], args[1], args[2], args[3])
+ elif name == "contextResumed":
+ assert len(args) == 1
+ self.listener.contextResumed(args[0])
+ elif name == "contextAdded":
+ assert len(args) == 1
+ self.listener.contextAdded(args[0])
+ elif name == "contextChanged":
+ assert len(args) == 1
+ self.listener.contextChanged(args[0])
+ elif name == "contextRemoved":
+ assert len(args) == 1
+ self.listener.contextRemoved(args[0])
+ elif name == "contextException":
+ assert len(args) == 2
+ self.listener.contextException(args[0], args[1])
+ elif name == "containerSuspended":
+ assert len(args) == 5
+ self.listener.containerSuspended(args[0], args[1], args[2], args[3], args[4])
+ elif name == "containerResumed":
+ assert len(args) == 1
+ self.listener.containerResumed(args[0])
+ else:
+ raise IOError("RunControl service: unknown event: " + name);
+ except exceptions.Exception as x:
+ self.service.channel.terminate(x)
+
+class RunControlProxy(runcontrol.RunControlService):
+ def __init__(self, channel):
+ self.channel = channel
+ self.listeners = {}
+
+ def addListener(self, listener):
+ l = ChannelEventListener(self, listener)
+ self.channel.addEventListener(self, l)
+ self.listeners[listener] = l
+
+ def removeListener(self, listener):
+ l = self.listeners.get(listener)
+ if l:
+ del self.listeners[listener]
+ self.channel.removeEventListener(self, l)
+
+ def getContext(self, context_id, done):
+ service = self
+ class GetContextCommand(Command):
+ def __init__(self):
+ super(GetContextCommand, self).__init__(service.channel, service, "getContext", (context_id,))
+ def done(self, error, args):
+ ctx = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ if args[1]: ctx = RunContext(service, args[1])
+ done.doneGetContext(self.token, error, ctx)
+ return GetContextCommand().token
+
+ def getChildren(self, parent_context_id, done):
+ service = self
+ class GetChildrenCommand(Command):
+ def __init__(self):
+ super(GetChildrenCommand, self).__init__(service.channel, service, "getChildren", (parent_context_id,))
+ def done(self, error, args):
+ contexts = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ contexts = args[1]
+ done.doneGetChildren(self.token, error, contexts)
+ return GetChildrenCommand().token
diff --git a/python/src/tcf/services/remote/StackTraceProxy.py b/python/src/tcf/services/remote/StackTraceProxy.py new file mode 100644 index 000000000..350db8295 --- /dev/null +++ b/python/src/tcf/services/remote/StackTraceProxy.py @@ -0,0 +1,52 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+from tcf.services import stacktrace
+from tcf.channel.Command import Command
+
+class StackTraceProxy(stacktrace.StackTraceService):
+ def __init__(self, channel):
+ self.channel = channel
+
+ def getChildren(self, parent_context_id, done):
+ service = self
+ class GetChildrenCommand(Command):
+ def __init__(self):
+ super(GetChildrenCommand, self).__init__(service.channel, service, "getChildren", (parent_context_id,))
+ def done(self, error, args):
+ contexts = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ contexts = args[1]
+ done.doneGetChildren(self.token, error, contexts)
+ return GetChildrenCommand().token
+
+ def getContext(self, ids, done):
+ service = self
+ class GetContextCommand(Command):
+ def __init__(self):
+ super(GetContextCommand, self).__init__(service.channel, service, "getContext", (ids,))
+ def done(self, error, args):
+ ctxs = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[1])
+ ctxs = service.toContextArray(args[0])
+ done.doneGetContext(self.token, error, ctxs)
+ return GetContextCommand().token
+
+ def toContextArray(self, ctxProps):
+ if ctxProps is None: return None
+ ctxs = []
+ for props in ctxProps:
+ ctxs.append(stacktrace.StackTraceContext(props))
+ return ctxs
diff --git a/python/src/tcf/services/remote/SymbolsProxy.py b/python/src/tcf/services/remote/SymbolsProxy.py new file mode 100644 index 000000000..7debc1bc7 --- /dev/null +++ b/python/src/tcf/services/remote/SymbolsProxy.py @@ -0,0 +1,115 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+from tcf import channel
+from tcf.services import symbols
+from tcf.channel.Command import Command
+
+
+class Context(symbols.Symbol):
+ def __init__(self, props):
+ super(Context, self).__init__(props)
+ self.value = channel.toByteArray(props.get(symbols.PROP_VALUE))
+
+ def getValue(self):
+ return self.value
+
+
+class SymbolsProxy(symbols.SymbolsService):
+ def __init__(self, channel):
+ self.channel = channel
+
+ def getContext(self, id, done):
+ service = self
+ class GetContextCommand(Command):
+ def __init__(self):
+ super(GetContextCommand, self).__init__(service.channel, service, "getContext", (id,))
+ def done(self, error, args):
+ ctx = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ if args[1]: ctx = Context(args[1])
+ done.doneGetContext(self.token, error, ctx)
+ return GetContextCommand().token
+
+ def getChildren(self, parent_context_id, done):
+ service = self
+ class GetChildrenCommand(Command):
+ def __init__(self):
+ super(GetChildrenCommand, self).__init__(service.channel, service, "getChildren", (parent_context_id,))
+ def done(self, error, args):
+ contexts = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ contexts = args[1]
+ done.doneGetChildren(self.token, error, contexts)
+ return GetChildrenCommand().token
+
+ def find(self, context_id, ip, name, done):
+ service = self
+ class FindCommand(Command):
+ def __init__(self):
+ super(FindCommand, self).__init__(service.channel, service, "find", (context_id, ip, name))
+ def done(self, error, args):
+ id = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ id = args[1]
+ done.doneFind(self.token, error, id)
+ return FindCommand().token
+
+ def findByAddr(self, context_id, addr, done):
+ service = self
+ class FindByAddrCommand(Command):
+ def __init__(self):
+ super(FindByAddrCommand, self).__init__(service.channel, service, "findByAddr", (context_id, addr))
+ def done(self, error, args):
+ id = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ id = args[1]
+ done.doneFind(self.token, error, id)
+ return FindByAddrCommand().token
+
+ def list(self, context_id, done):
+ service = self
+ class ListCommand(Command):
+ def __init__(self):
+ super(ListCommand, self).__init__(service.channel, service, "list", (context_id,))
+ def done(self, error, args):
+ lst = None
+ if not error:
+ assert len(args) == 2
+ error = self.toError(args[0])
+ lst = args[1]
+ done.doneList(self.token, error, lst)
+ return ListCommand().token
+
+ def findFrameInfo(self, context_id, address, done):
+ service = self
+ class FindFrameInfoCommand(Command):
+ def __init__(self):
+ super(FindFrameInfoCommand, self).__init__(service.channel, service, "findFrameInfo", (context_id, address))
+ def done(self, error, args):
+ address = None
+ size = None
+ fp_cmds = None
+ reg_cmds = None
+ if not error:
+ assert len(args) == 5
+ error = self.toError(args[0])
+ address, size, fp_cmds, reg_cmds = args[1:4]
+ done.doneList(self.token, error, address, size, fp_cmds, reg_cmds)
+ return FindFrameInfoCommand().token
diff --git a/python/src/tcf/services/remote/__init__.py b/python/src/tcf/services/remote/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python/src/tcf/services/remote/__init__.py diff --git a/python/src/tcf/services/runcontrol.py b/python/src/tcf/services/runcontrol.py new file mode 100644 index 000000000..085f5e28f --- /dev/null +++ b/python/src/tcf/services/runcontrol.py @@ -0,0 +1,480 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions
+from tcf import services
+
+NAME = "RunControl"
+
+# Context property names.
+# Run control context ID */
+PROP_ID = "ID"
+
+# Context parent (owner) ID, for a thread it is same as process ID */
+PROP_PARENT_ID = "ParentID"
+
+# Context process (memory space) ID */
+PROP_PROCESS_ID = "ProcessID"
+
+# ID of a context that created this context */
+PROP_CREATOR_ID = "CreatorID"
+
+# Human readable context name */
+PROP_NAME = "Name"
+
+# true if the context is a container. Container can propagate run control commands to his children */
+PROP_IS_CONTAINER = "IsContainer"
+
+# true if context has execution state - can be suspended/resumed */
+PROP_HAS_STATE = "HasState"
+
+# Bit-set of RM_ values that are supported by the context */
+PROP_CAN_RESUME = "CanResume"
+
+# Bit-set of RM_ values that can be used with count > 1 */
+PROP_CAN_COUNT = "CanCount"
+
+# true if suspend command is supported by the context */
+PROP_CAN_SUSPEND = "CanSuspend"
+
+# true if terminate command is supported by the context */
+PROP_CAN_TERMINATE = "CanTerminate"
+
+# Context ID of a run control group that contains the context.
+# Members of same group are always suspended and resumed together:
+# resuming/suspending a context resumes/suspends all members of the group */
+PROP_RC_GROUP = "RCGroup"
+
+# Context resume modes.
+RM_RESUME = 0
+
+# Step over a single instruction.
+# If the instruction is a function call then don't stop until the function returns.
+
+RM_STEP_OVER = 1
+
+# Step a single instruction.
+# If the instruction is a function call then stop at first instruction of the function.
+RM_STEP_INTO = 2
+
+# Step over a single source code line.
+# If the line contains a function call then don't stop until the function returns.
+RM_STEP_OVER_LINE = 3
+
+# Step a single source code line.
+# If the line contains a function call then stop at first line of the function.
+RM_STEP_INTO_LINE = 4
+
+# Run until control returns from current function.
+RM_STEP_OUT = 5
+
+# Start running backwards.
+# Execution will continue until suspended by command or breakpoint.
+RM_REVERSE_RESUME = 6
+
+# Reverse of RM_STEP_OVER - run backwards over a single instruction.
+# If the instruction is a function call then don't stop until get out of the function.
+RM_REVERSE_STEP_OVER = 7
+
+# Reverse of RM_STEP_INTO.
+# This effectively "un-executes" the previous instruction
+RM_REVERSE_STEP_INTO = 8
+
+# Reverse of RM_STEP_OVER_LINE.
+# Resume backward execution of given context until control reaches an instruction that belongs
+# to a different source line.
+# If the line contains a function call then don't stop until get out of the function.
+# Error is returned if line number information not available.
+RM_REVERSE_STEP_OVER_LINE = 9
+
+# Reverse of RM_STEP_INTO_LINE,
+# Resume backward execution of given context until control reaches an instruction that belongs
+# to a different line of source code.
+# If a function is called, stop at the beginning of the last line of the function code.
+# Error is returned if line number information not available.
+RM_REVERSE_STEP_INTO_LINE = 10
+
+# Reverse of RM_STEP_OUT.
+# Resume backward execution of the given context until control reaches the point where the current function was called.
+RM_REVERSE_STEP_OUT = 11
+
+# Step over instructions until PC is outside the specified range.
+# Any function call within the range is considered to be in range.
+RM_STEP_OVER_RANGE = 12
+
+# Step instruction until PC is outside the specified range for any reason.
+RM_STEP_INTO_RANGE = 13
+
+# Reverse of RM_STEP_OVER_RANGE
+RM_REVERSE_STEP_OVER_RANGE = 14
+
+# Reverse of RM_STEP_INTO_RANGE
+RM_REVERSE_STEP_INTO_RANGE = 15
+
+# Run until the context becomes active - scheduled to run on a target CPU
+RM_UNTIL_ACTIVE = 16
+
+# Run reverse until the context becomes active
+RM_REVERSE_UNTIL_ACTIVE = 17
+
+# State change reason of a context.
+# Reason can be any text, but if it is one of predefined strings,
+# a generic client might be able to handle it better.
+REASON_USER_REQUEST = "Suspended"
+REASON_STEP = "Step"
+REASON_BREAKPOINT = "Breakpoint"
+REASON_EXCEPTION = "Exception"
+REASON_CONTAINER = "Container"
+REASON_WATCHPOINT = "Watchpoint"
+REASON_SIGNAL = "Signal"
+REASON_SHAREDLIB = "Shared Library"
+REASON_ERROR = "Error"
+
+# Optional parameters of context state.
+STATE_SIGNAL = "Signal"
+STATE_SIGNAL_NAME = "SignalName"
+STATE_SIGNAL_DESCRIPTION = "SignalDescription"
+STATE_BREAKPOINT_IDS = "BPs"
+STATE_PC_ERROR = "PCError"
+
+# Optional parameters of resume command.
+# Integer - starting address of step range, inclusive */
+RP_RANGE_START = "RangeStart"
+
+# Integer - ending address of step range, exclusive */
+RP_RANGE_END = "RangeEnd"
+
+class RunControlService(services.Service):
+ def getName(self):
+ return NAME
+
+ def getContext(self, id, done):
+ """
+ Retrieve context properties for given context ID.
+
+ @param id - context ID.
+ @param done - callback interface called when operation is completed.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getChildren(self, parent_context_id, done):
+ """
+ Retrieve children of given context.
+
+ @param parent_context_id - parent context ID. Can be null -
+ to retrieve top level of the hierarchy, or one of context IDs retrieved
+ by previous getContext or getChildren commands.
+ @param done - callback interface called when operation is completed.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def addListener(self, listener):
+ """
+ Add run control event listener.
+ @param listener - run control event listener to add.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def removeListener(self, listener):
+ """
+ Remove run control event listener.
+ @param listener - run control event listener to remove.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+
+class RunControlError(exceptions.Exception):
+ pass
+
+class DoneGetState(object):
+ def doneGetState(self, token, error, suspended, pc, reason, params):
+ """
+ Called when getState command execution is complete.
+ @param token - pending command handle.
+ @param error - command execution error or null.
+ @param suspended - true if the context is suspended
+ @param pc - program counter of the context (if suspended).
+ @param reason - suspend reason (if suspended), see REASON_*.
+ @param params - additional target specific data about context state, see STATE_*.
+ """
+ pass
+
+class DoneCommand(object):
+ def doneCommand(self, token, error):
+ """
+ Called when run control command execution is complete.
+ @param token - pending command handle.
+ @param error - command execution error or null.
+ """
+ pass
+
+class DoneGetContext(object):
+ "Client callback interface for getContext()."
+ def doneGetContext(self, token, error, context):
+ """
+ Called when context data retrieval is done.
+ @param error - error description if operation failed, null if succeeded.
+ @param context - context data.
+ """
+ pass
+
+class DoneGetChildren(object):
+ "Client callback interface for getChildren()."
+ def doneGetChildren(self, token, error, context_ids):
+ """
+ Called when context list retrieval is done.
+ @param error - error description if operation failed, null if succeeded.
+ @param context_ids - array of available context IDs.
+ """
+ pass
+
+class RunControlContext(object):
+ """
+ A context corresponds to an execution thread, process, address space, etc.
+ A context can belong to a parent context. Contexts hierarchy can be simple
+ plain list or it can form a tree. It is up to target agent developers to choose
+ layout that is most descriptive for a given target. Context IDs are valid across
+ all services. In other words, all services access same hierarchy of contexts,
+ with same IDs, however, each service accesses its own subset of context's
+ attributes and functionality, which is relevant to that service.
+ """
+ def __init__(self, props):
+ self._props = props or {}
+
+ def getProperties(self):
+ """
+ Get context properties. See PROP_* definitions for property names.
+ Context properties are read only, clients should not try to modify them.
+ @return Map of context properties.
+ """
+ return self._props
+
+ def getID(self):
+ """
+ Retrieve context ID.
+ Same as getProperties().get('ID')
+ """
+ return self._props.get(PROP_ID)
+
+ def getParentID(self):
+ """
+ Retrieve parent context ID.
+ Same as getProperties().get('ParentID')
+ """
+ return self._props.get(PROP_PARENT_ID)
+
+ def getProcessID(self):
+ """
+ Retrieve context process ID.
+ Same as getProperties().get('ProcessID')
+ """
+ return self._props.get(PROP_PROCESS_ID)
+
+ def getCreatorID(self):
+ """
+ Retrieve context creator ID.
+ Same as getProperties().get('CreatorID')
+ """
+ return self._props.get(PROP_CREATOR_ID)
+
+ def getName(self):
+ """
+ Retrieve human readable context name.
+ Same as getProperties().get('Name')
+ """
+ return self._props.get(PROP_NAME)
+
+ def isContainer(self):
+ """
+ Utility method to read context property PROP_IS_CONTAINER.
+ Executing resume or suspend command on a container causes all its children to resume or suspend.
+ @return value of PROP_IS_CONTAINER.
+ """
+ return self._props.get(PROP_IS_CONTAINER)
+
+ def hasState(self):
+ """
+ Utility method to read context property PROP_HAS_STATE.
+ Only context that has a state can be resumed or suspended.
+ @return value of PROP_HAS_STATE.
+ """
+ return self._props.get(PROP_HAS_STATE)
+
+ def canSuspend(self):
+ """
+ Utility method to read context property PROP_CAN_SUSPEND.
+ Value 'true' means suspend command is supported by the context,
+ however the method does not check that the command can be executed successfully in
+ the current state of the context. For example, the command still can fail if context is
+ already suspended.
+ @return value of PROP_CAN_SUSPEND.
+ """
+ return self._props.get(PROP_CAN_SUSPEND)
+
+ def canResume(self, mode):
+ """
+ Utility method to read a 'mode' bit in context property PROP_CAN_RESUME.
+ Value 'true' means resume command is supported by the context,
+ however the method does not check that the command can be executed successfully in
+ the current state of the context. For example, the command still can fail if context is
+ already resumed.
+ @param mode - resume mode, see RM_*.
+ @return value of requested bit of PROP_CAN_RESUME.
+ """
+ b = self._props.get(PROP_CAN_RESUME) or 0
+ return (b & (1 << mode)) != 0
+
+ def canCount(self, mode):
+ """
+ Utility method to read a 'mode' bit in context property PROP_CAN_COUNT.
+ Value 'true' means resume command with count other then 1 is supported by the context,
+ however the method does not check that the command can be executed successfully in
+ the current state of the context. For example, the command still can fail if context is
+ already resumed.
+ @param mode - resume mode, see RM_*.
+ @return value of requested bit of PROP_CAN_COUNT.
+ """
+ b = self._props.get(PROP_CAN_COUNT) or 0
+ return (b & (1 << mode)) != 0
+
+ def canTerminate(self):
+ """
+ Utility method to read context property PROP_CAN_TERMINATE.
+ Value 'true' means terminate command is supported by the context,
+ however the method does not check that the command can be executed successfully in
+ the current state of the context. For example, the command still can fail if context is
+ already exited.
+ @return value of PROP_CAN_SUSPEND.
+ """
+ return self._props.get(PROP_CAN_TERMINATE)
+
+ def getRCGroup(self):
+ """
+ Utility method to read context property PROP_RC_GROUP -
+ context ID of a run control group that contains the context.
+ Members of same group are always suspended and resumed together:
+ resuming/suspending a context resumes/suspends all members of the group.
+ @return value of PROP_RC_GROUP.
+ """
+ return self._props.get(PROP_RC_GROUP)
+
+ def getState(self, done):
+ """
+ Send a command to retrieve current state of a context.
+ @param done - command result call back object.
+ @return pending command handle, can be used to cancel the command.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def suspend(self, done):
+ """
+ Send a command to suspend a context.
+ Also suspends children if context is a container.
+ @param done - command result call back object.
+ @return pending command handle, can be used to cancel the command.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+# def resume(self, mode, count, done):
+# """
+# Send a command to resume a context.
+# Also resumes children if context is a container.
+# @param mode - defines how to resume the context, see RM_*.
+# @param count - if mode implies stepping, defines how many steps to perform.
+# @param done - command result call back object.
+# @return pending command handle, can be used to cancel the command.
+# """
+# raise exceptions.NotImplementedError("Abstract method")
+
+ def resume(self, mode, count, params, done):
+ """
+ Send a command to resume a context.
+ Also resumes children if context is a container.
+ @param mode - defines how to resume the context, see RM_*.
+ @param count - if mode implies stepping, defines how many steps to perform.
+ @param params - resume parameters, for example, step range definition, see RP_*.
+ @param done - command result call back object.
+ @return pending command handle, can be used to cancel the command.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def terminate(self, done):
+ """
+ Send a command to terminate a context.
+ @param done - command result call back object.
+ @return pending command handle, can be used to cancel the command.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+class RunControlListener(object):
+ "Service events listener interface."
+ def contextAdded(self, contexts):
+ """
+ Called when new contexts are created.
+ @param contexts - array of new context properties.
+ """
+ pass
+ def contextChanged(self, contexts):
+ """
+ Called when a context properties changed.
+ @param contexts - array of new context properties.
+ """
+ pass
+ def contextRemoved(self, context_ids):
+ """
+ Called when contexts are removed.
+ @param context_ids - array of removed context IDs.
+ """
+ pass
+ def contextSuspended(self, context, pc, reason, params):
+ """
+ Called when a thread is suspended.
+ @param context - ID of a context that was suspended.
+ @param pc - program counter of the context, can be null.
+ @param reason - human readable description of suspend reason.
+ @param params - additional, target specific data about suspended context.
+ """
+ pass
+ def contextResumed(self, context):
+ """
+ Called when a thread is resumed.
+ @param context - ID of a context that was resumed.
+ """
+ pass
+ def containerSuspended(self, context, pc, reason, params, suspended_ids):
+ """
+ Called when target simultaneously suspends multiple threads in a container
+ (process, core, etc.).
+
+ @param context - ID of a context responsible for the event. It can be container ID or
+ any one of container children, for example, it can be thread that hit "suspend all" breakpoint.
+ Client expected to move focus (selection) to this context.
+ @param pc - program counter of the context.
+ @param reason - suspend reason, see REASON_*.
+ @param params - additional target specific data about context state, see STATE_*.
+ @param suspended_ids - full list of all contexts that were suspended.
+ """
+ pass
+ def containerResumed(self, context_ids):
+ """
+ Called when target simultaneously resumes multiple threads in a container (process,
+ core, etc.).
+
+ @param context_ids - full list of all contexts that were resumed.
+ """
+ pass
+ def contextException(self, context, msg):
+ """
+ Called when an exception is detected in a target thread.
+ @param context - ID of a context that caused an exception.
+ @param msg - human readable description of the exception.
+ """
+ pass
diff --git a/python/src/tcf/services/stacktrace.py b/python/src/tcf/services/stacktrace.py new file mode 100644 index 000000000..ae8a6d5cf --- /dev/null +++ b/python/src/tcf/services/stacktrace.py @@ -0,0 +1,160 @@ +#******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+#******************************************************************************
+
+import exceptions
+from tcf import services
+
+NAME = "StackTrace"
+
+#
+# Stack frame context property names.
+#
+PROP_ID = "ID" # String, stack frame ID
+PROP_PARENT_ID = "ParentID" # String, stack frame parent ID
+PROP_PROCESS_ID = "ProcessID" # String, stack frame process ID
+PROP_NAME = "Name" # String, human readable name
+PROP_TOP_FRAME = "TopFrame" # Boolean, true if the frame is top frame on a stack
+PROP_LEVEL = "Level" # Integer, stack frame level, starting from stack bottom
+PROP_FRAME_ADDRESS = "FP" # Number, stack frame memory address
+PROP_RETURN_ADDRESS = "RP" # Number, return address
+PROP_INSTRUCTION_ADDRESS = "IP" # Number, instruction pointer
+PROP_ARGUMENTS_COUNT = "ArgsCnt" # Integer, number of function arguments
+PROP_ARGUMENTS_ADDRESS = "ArgsAddr" # Number, memory address of function arguments
+
+class StackTraceService(services.Service):
+ def getName(self):
+ return NAME
+
+ def getContext(self, ids, done):
+ """
+ Retrieve context info for given context IDs.
+
+ The command will fail if parent thread is not suspended.
+ Client can use Run Control service to suspend a thread.
+
+ @param ids - array of context IDs.
+ @param done - call back interface called when operation is completed.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getChildren(self, parent_context_id, done):
+ """
+ Retrieve stack trace context list.
+ Parent context usually corresponds to an execution thread.
+ Some targets have more then one stack. In such case children of a thread
+ are stacks, and stack frames are deeper in the hierarchy - they can be
+ retrieved with additional getChildren commands.
+
+ The command will fail if parent thread is not suspended.
+ Client can use Run Control service to suspend a thread.
+
+ @param parent_context_id - parent context ID.
+ @param done - call back interface called when operation is completed.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+class DoneGetContext(object):
+ """
+ Client call back interface for getContext().
+ """
+ def doneGetContext(self, token, error, contexts):
+ """
+ Called when context data retrieval is done.
+ @param error - error description if operation failed, null if succeeded.
+ @param contexts - array of context data or null if error.
+ """
+ pass
+
+class DoneGetChildren(object):
+ """
+ Client call back interface for getChildren().
+ """
+ def doneGetChildren(self, token, error, context_ids):
+ """
+ Called when context list retrieval is done.
+ @param error - error description if operation failed, null if succeeded.
+ @param context_ids - array of available context IDs.
+ Stack frames are ordered from stack bottom to top.
+ """
+ pass
+
+class StackTraceContext(object):
+ """
+ StackTraceContext represents stack trace objects - stacks and stack frames.
+ """
+ def __init__(self, props):
+ self._props = props or {}
+
+ def getID(self):
+ """
+ Get Context ID.
+ @return context ID.
+ """
+ return self._props.get(PROP_ID)
+
+ def getParentID(self):
+ """
+ Get parent context ID.
+ @return parent context ID.
+ """
+ return self._props.get(PROP_PARENT_ID)
+
+ def getName(self):
+ """
+ Get context name - if context represents a stack.
+ @return context name or null.
+ """
+ return self._props.get(PROP_NAME)
+
+ def getFrameAddress(self):
+ """
+ Get memory address of this frame.
+ @return address or None if not a stack frame.
+ """
+ return self._props.get(PROP_FRAME_ADDRESS)
+
+ def getReturnAddress(self):
+ """
+ Get program counter saved in this stack frame -
+ it is address of instruction to be executed when the function returns.
+ @return return address or null if not a stack frame.
+ """
+ return self._props.get(PROP_RETURN_ADDRESS)
+
+ def getInstructionAddress(self):
+ """
+ Get address of the next instruction to be executed in this stack frame.
+ For top frame it is same as PC register value.
+ For other frames it is same as return address of the next frame.
+ @return instruction address or null if not a stack frame.
+ """
+ return self._props.get(PROP_INSTRUCTION_ADDRESS)
+
+ def getArgumentsCount(self):
+ """
+ Get number of function arguments for this frame.
+ @return function arguments count.
+ """
+ return self._props.get(PROP_ARGUMENTS_COUNT)
+
+ def getArgumentsAddress(self):
+ """
+ Get address of function arguments area in memory.
+ @return function arguments address or null if not available.
+ """
+ return self._props.get(PROP_ARGUMENTS_ADDRESS, 0)
+
+ def getProperties(self):
+ """
+ Get complete map of context properties.
+ @return map of context properties.
+ """
+ return self._props
diff --git a/python/src/tcf/services/symbols.py b/python/src/tcf/services/symbols.py new file mode 100644 index 000000000..3976956d3 --- /dev/null +++ b/python/src/tcf/services/symbols.py @@ -0,0 +1,381 @@ +#******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+#******************************************************************************
+
+import exceptions
+from tcf import services
+
+# Service name.
+NAME = "Symbols"
+
+class SymbolClass:
+ unknown = 0 # unknown symbol class
+ value = 1 # constant value
+ reference = 2 # variable data object
+ function = 3 # function body
+ type = 4 # a type
+
+class TypeClass:
+ unknown = 0 # unknown type class
+ cardinal = 1 # unsigned integer
+ integer = 2 # signed integer
+ real = 3 # float, double
+ pointer = 4 # pointer to anything.
+ array = 5 # array of anything.
+ composite = 6 # struct, union, or class.
+ enumeration = 7 # enumeration type.
+ function = 8 # function type.
+
+#
+# Symbol context property names.
+#
+PROP_ID = "ID"
+PROP_OWNER_ID = "OwnerID"
+PROP_UPDATE_POLICY = "UpdatePolicy"
+PROP_NAME = "Name"
+PROP_SYMBOL_CLASS = "Class"
+PROP_TYPE_CLASS = "TypeClass"
+PROP_TYPE_ID = "TypeID"
+PROP_BASE_TYPE_ID = "BaseTypeID"
+PROP_INDEX_TYPE_ID = "IndexTypeID"
+PROP_SIZE = "Size"
+PROP_LENGTH = "Length"
+PROP_LOWER_BOUND = "LowerBound"
+PROP_UPPER_BOUND = "UpperBound"
+PROP_OFFSET = "Offset"
+PROP_ADDRESS = "Address"
+PROP_VALUE = "Value"
+PROP_BIG_ENDIAN = "BigEndian"
+
+#
+# Symbol context properties update policies.
+#
+
+# Update policy "Memory Map": symbol properties become invalid when
+# memory map changes - when modules are loaded or unloaded.
+# Symbol OwnerID indicates memory space (process) that is invalidation events source.
+# Most static variables and types have this update policy.
+UPDATE_ON_MEMORY_MAP_CHANGES = 0
+
+# Update policy "Execution State": symbol properties become invalid when
+# execution state changes - a thread is suspended, resumed or exited.
+# Symbol OwnerID indicates executable context (thread) that is invalidation events source.
+# Most stack (auto) variables have this update policy.
+UPDATE_ON_EXE_STATE_CHANGES = 1
+
+
+class Symbol(object):
+ """
+ Symbol context interface.
+ """
+ def __init__(self, props):
+ self._props = props or {}
+
+ def getID(self):
+ """
+ Get symbol ID.
+ @return symbol ID.
+ """
+ return self._props.get(PROP_ID)
+
+ def getOwnerID(self):
+ """
+ Get symbol owner ID.
+ The owner can a thread or memory space (process).
+ Certain changes in owner state can invalidate cached symbol properties,
+ see getUpdatePolicy() and UPDATE_*.
+ """
+ return self._props.get(PROP_OWNER_ID)
+
+ def getUpdatePolicy(self):
+ """
+ Get symbol properties update policy ID.
+ Symbol properties can change during program execution.
+ If a client wants to cache symbols, it should invalidate cached data
+ according to update policies of cached symbols.
+ @return symbol update policy ID, see UPDATE_*
+ """
+ return self._props.get(PROP_UPDATE_POLICY)
+
+ def getName(self):
+ """
+ Get symbol name.
+ @return symbol name or null.
+ """
+ return self._props.get(PROP_NAME)
+
+ def getSymbolClass(self):
+ """
+ Get symbol class.
+ @return symbol class.
+ """
+ return self._props.get(PROP_SYMBOL_CLASS)
+
+ def getTypeClass(self):
+ """
+ Get symbol type class.
+ @return type class.
+ """
+ return self._props.get(PROP_TYPE_CLASS)
+
+ def getTypeID(self):
+ """
+ Get type ID.
+ If the symbol is a type and not a 'typedef', return same as getID().
+ @return type ID.
+ """
+ return self._props.get(PROP_TYPE_ID)
+
+ def getBaseTypeID(self):
+ """
+ Get base type ID.
+ If this symbol is a
+ pointer type - return pointed type
+ array type - return element type
+ function type - return function result type
+ class type - return base class
+ otherwise return null.
+ @return type ID.
+ """
+ return self._props.get(PROP_BASE_TYPE_ID)
+
+ def getIndexTypeID(self):
+ """
+ Get index type ID.
+ If this symbol is a
+ array type - return array index type
+ otherwise return null.
+ @return type ID.
+ """
+ return self._props.get(PROP_INDEX_TYPE_ID)
+
+ def getSize(self):
+ """
+ Return value size of the symbol (or type).
+ @return size in bytes.
+ """
+ return self._props.get(PROP_SIZE, 0)
+
+ def getLength(self):
+ """
+ If symbol is an array type - return number of elements.
+ @return number of elements.
+ """
+ return self._props.get(PROP_LENGTH, 0)
+
+ def getLowerBound(self):
+ """
+ If symbol is an array type - return array index lower bound.
+ @return lower bound.
+ """
+ return self._props.get(PROP_LOWER_BOUND)
+
+ def getUpperBound(self):
+ """
+ If symbol is an array type - return array index upper bound.
+ @return upper bound.
+ """
+ return self._props.get(PROP_UPPER_BOUND)
+
+ def getOffset(self):
+ """
+ Return offset from 'this' for member of class, struct or union.
+ @return offset in bytes.
+ """
+ return self._props.get(PROP_OFFSET, 0)
+
+ def getAddress(self):
+ """
+ Return address of the symbol.
+ @return address or null.
+ """
+ return self._props.get(PROP_ADDRESS)
+
+ def getValue(self):
+ """
+ If symbol is a constant object, return its value.
+ @return symbol value as array of bytes.
+ """
+ return self._props.get(PROP_VALUE)
+
+ def isBigEndian(self):
+ """
+ Get symbol values endianness.
+ @return true if symbol is big-endian.
+ """
+ return self._props.get(PROP_BIG_ENDIAN, False)
+
+ def getProperties(self):
+ """
+ Get complete map of context properties.
+ @return map of context properties.
+ """
+ return self._props
+
+class SymbolsService(services.Service):
+ def getName(self):
+ return NAME
+
+ def getContext(self, id, done):
+ """
+ Retrieve symbol context info for given symbol ID.
+ @see Symbol
+
+ @param id - symbol context ID.
+ @param done - call back interface called when operation is completed.
+ @return - pending command handle.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def getChildren(self, parent_context_id, done):
+ """
+ Retrieve children IDs for given parent ID.
+ Meaning of the operation depends on parent kind:
+ 1. struct, union, or class type - get fields
+ 2. enumeration type - get enumerators
+
+ @param parent_context_id - parent symbol context ID.
+ @param done - call back interface called when operation is completed.
+ @return - pending command handle.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def find(self, context_id, ip, name, done):
+ """
+ Search symbol with given name in given context.
+ The context can be memory space, process, thread or stack frame.
+
+ @param context_id - a search scope.
+ @param ip - instruction pointer - ignored if context_id is a stack frame ID
+ @param name - symbol name.
+ @param done - call back interface called when operation is completed.
+ @return - pending command handle.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def findByAddr(self, context_id, addr, done):
+ """
+ Search symbol with given address in given context.
+ The context can be memory space, process, thread or stack frame.
+
+ @param context_id - a search scope.
+ @param addr - symbol address.
+ @param done - call back interface called when operation is completed.
+ @return - pending command handle.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def list(self, context_id, done):
+ """
+ List all symbols in given context.
+ The context can be a stack frame.
+
+ @param context_id - a scope.
+ @param done - call back interface called when operation is completed.
+ @return - pending command handle.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def findFrameInfo(self, context_id, address, done):
+ """
+ Retrieve stack tracing commands for given instruction address in a context memory.
+ @param context_id - exacutable context ID.
+ @param address - instruction address.
+ @param done - call back interface called when operation is completed.
+ @return - pending command handle.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+class DoneGetContext(object):
+ """
+ Client call back interface for getContext().
+ """
+ def doneGetContext(self, token, error, context):
+ """
+ Called when context data retrieval is done.
+ @param token - command handle
+ @param error - error description if operation failed, null if succeeded.
+ @param context - context properties.
+ """
+ pass
+class DoneGetChildren(object):
+ """
+ Client call back interface for getChildren().
+ """
+ def doneGetChildren(self, token, error, context_ids):
+ """
+ Called when context list retrieval is done.
+ @param token - command handle
+ @param error - error description if operation failed, null if succeeded.
+ @param context_ids - array of available context IDs.
+ """
+ pass
+
+class DoneFind(object):
+ """
+ Client call back interface for find().
+ """
+ def doneFind(self, token, error, symbol_id):
+ """
+ Called when symbol search is done.
+ @param token - command handle.
+ @param error - error description if operation failed, null if succeeded.
+ @param symbol_id - symbol ID.
+ """
+ pass
+
+class DoneList(object):
+ """
+ Client call back interface for list().
+ """
+ def doneList(self, token, error, symbol_ids):
+ """
+ Called when symbol list retrieval is done.
+ @param token - command handle.
+ @param error - error description if operation failed, null if succeeded.
+ @param symbol_ids - array of symbol IDs.
+ """
+
+
+#
+# Command codes that are used to calculate frame pointer and register values during stack tracing.
+#
+
+# Load a number to the evaluation stack. Command argument is the number.
+CMD_NUMBER = 1
+
+# Load a register value to the evaluation stack. Command argument is the register ID.
+CMD_REGISTER = 2
+
+# Load frame address to the evaluation stack.
+CMD_FP = 3
+
+# Read memory at address on the top of the evaluation stack. Command arguments are
+# the value size (Number) and endianness (Boolean, false - little-endian, true - big-endian).
+CMD_DEREF = 4
+
+# Add two values on top of the evaluation stack
+CMD_ADD = 5
+
+class DoneFindFrameInfo(object):
+ """
+ Client call back interface for findFrameInfo().
+ """
+ def doneFindFrameInfo(self, token, error, address, size, fp_cmds, reg_cmds):
+ """
+ Called when stack tracing information retrieval is done.
+ @param token - command handle.
+ @param error - error description if operation failed, null if succeeded.
+ @param address - start of instruction address range
+ @param size - size of instruction address range
+ @param fp_cmds - commands to calculate stack frame pointer
+ @param reg_cmds - map register IDs -> commands to calculate register values
+ """
+ pass
diff --git a/python/src/tcf/shell.py b/python/src/tcf/shell.py new file mode 100644 index 000000000..ac5e3e65f --- /dev/null +++ b/python/src/tcf/shell.py @@ -0,0 +1,58 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import code, sys
+import tcf
+from tcf.util import sync, event
+from tcf import protocol, channel
+
+
+class Shell(code.InteractiveConsole, protocol.ChannelOpenListener, channel.ChannelListener):
+ def __init__(self):
+ locals = {
+ "tcf" : tcf,
+ "connect" : tcf.connect
+ }
+ sys.ps1 = "tcf> "
+ protocol.startEventQueue()
+ protocol.invokeAndWait(protocol.addChannelOpenListener, self)
+ code.InteractiveConsole.__init__(self, locals)
+ def interact(self, banner=None):
+ try:
+ super(Shell, self).interact(banner)
+ finally:
+ protocol.removeChannelOpenListener(self)
+ def onChannelOpen(self, channel):
+ wrapper = sync.DispatchWrapper(channel)
+ self.locals["channel"] = wrapper
+ self.locals["disconnect"] = wrapper.close
+ self.locals["cmd"] = sync.CommandControl(channel, interactive=True)
+ self.locals["events"] = event.EventRecorder(channel)
+ protocol.invokeAndWait(protocol.removeChannelOpenListener, self)
+ wrapper.addChannelListener(self)
+ def onChannelClosed(self, error):
+ del self.locals["channel"]
+ del self.locals["cmd"]
+ del self.locals["disconnect"]
+ del self.locals["events"]
+ protocol.addChannelOpenListener(self)
+
+def interact():
+ try:
+ # enable commandline editing if available
+ import readline
+ except ImportError:
+ pass
+ shell = Shell()
+ shell.interact("TCF Shell")
+
+if __name__ == "__main__":
+ interact()
diff --git a/python/src/tcf/tests/BasicTests.py b/python/src/tcf/tests/BasicTests.py new file mode 100644 index 000000000..b4da8fbac --- /dev/null +++ b/python/src/tcf/tests/BasicTests.py @@ -0,0 +1,292 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import sys, time, threading, exceptions
+import tcf
+from tcf import protocol, channel
+from tcf.util import sync
+
+class TraceListener(channel.TraceListener):
+ def onMessageReceived(self, type, token, service, name, data):
+ print "<<<", type, token, service, name, data
+ def onMessageSent(self, type, token, service, name, data):
+ print ">>>", type, token, service, name, data
+ def onChannelClosed(self, error):
+ print>>sys.stderr, "*** closed ***", error
+
+_suspended = []
+
+def test():
+ protocol.startEventQueue()
+ c = tcf.connect("TCP:127.0.0.1:1534")
+ assert c.state == channel.STATE_OPEN
+ #protocol.invokeLater(c.addTraceListener, TraceListener())
+ def r2():
+ print "services=", c.getRemoteServices()
+ protocol.invokeLater(r2)
+
+ testRunControl(c)
+ testBreakpoints(c)
+ testSymbols(c)
+ testSyncCommands(c)
+ testEvents(c)
+ testDataCache(c)
+
+ if c.state == channel.STATE_OPEN:
+ time.sleep(10)
+ protocol.invokeLater(c.close)
+ time.sleep(5)
+
+def testRunControl(c):
+ lock = threading.Condition()
+ from tcf.services import runcontrol
+ def r3():
+ rctrl = c.getRemoteService(runcontrol.NAME)
+ pending = []
+ class DoneGetContext(runcontrol.DoneGetContext):
+ def doneGetContext(self, token, error, context):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.getContext", error)
+ else:
+ print "ID: ", context.getID()
+ print "Name: ", context.getName()
+ print "Parent: ", context.getParentID()
+ print "IsContainer: ", context.isContainer()
+ class DoneGetState(runcontrol.DoneGetState):
+ def doneGetState(self, token, error, suspended, pc, reason, params):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.getState", error)
+ else:
+ print "suspended: ", suspended
+ print "pc: ", pc
+ print "reason: ", reason
+ print "params: ", params
+ if suspended:
+ _suspended.append(context.getID())
+ testStackTrace(c, context.getID())
+ if len(pending) == 0:
+ with lock:
+ lock.notify()
+ if context and context.hasState(): pending.append(context.getState(DoneGetState()))
+ if len(pending) == 0:
+ with lock:
+ lock.notify()
+ class DoneGetChildren(runcontrol.DoneGetChildren):
+ def doneGetChildren(self, token, error, context_ids):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.GetChildren", error)
+ else:
+ for c in context_ids:
+ pending.append(rctrl.getContext(c, DoneGetContext()))
+ pending.append(rctrl.getChildren(c, self))
+ if len(pending) == 0:
+ with lock:
+ lock.notify()
+ pending.append(rctrl.getChildren(None, DoneGetChildren()))
+ with lock:
+ protocol.invokeLater(r3)
+ lock.wait(5000)
+ def r4():
+ rc = c.getRemoteService(runcontrol.NAME)
+ class RCListener(runcontrol.RunControlListener):
+ def contextSuspended(self, *args):
+ print "context suspended: ", args
+ rc.removeListener(self)
+ def contextResumed(self, *args):
+ print "context resumed: ", args
+ def containerSuspended(self, *args):
+ print "container suspended:", args
+ rc.removeListener(self)
+ def containerResumed(self, *args):
+ print "container resumed:", args
+ rc.addListener(RCListener())
+ class DoneGetContext(runcontrol.DoneGetContext):
+ def doneGetContext(self, token, error, context):
+ if error:
+ protocol.log("Error from RunControl.getContext", error)
+ with lock: lock.notify()
+ return
+ class DoneResume(runcontrol.DoneCommand):
+ def doneCommand(self, token, error):
+ if error:
+ protocol.log("Error from RunControl.resume", error)
+ else:
+ context.suspend(runcontrol.DoneCommand())
+ with lock: lock.notify()
+ context.resume(runcontrol.RM_RESUME, 1, None, DoneResume())
+ rc.getContext(_suspended[0], DoneGetContext())
+
+ if _suspended:
+ with lock:
+ protocol.invokeLater(r4)
+ lock.wait(5000)
+
+def testBreakpoints(c):
+ from tcf.services import breakpoints
+ def r3():
+ bps = c.getRemoteService(breakpoints.NAME)
+ class DoneGetIDs(breakpoints.DoneGetIDs):
+ def doneGetIDs(self, token, error, ids):
+ if error:
+ protocol.log("Error from Breakpoints.getIDs", error)
+ return
+ print "Breakpoints :", ids
+ class DoneGetProperties(breakpoints.DoneGetProperties):
+ def doneGetProperties(self, token, error, props):
+ if error:
+ protocol.log("Error from Breakpoints.getProperties", error)
+ return
+ print "Breakpoint Properties: ", props
+ class DoneGetStatus(breakpoints.DoneGetStatus):
+ def doneGetProperties(self, token, error, props):
+ if error:
+ protocol.log("Error from Breakpoints.getStatus", error)
+ return
+ print "Breakpoint Status: ", props
+ for id in ids:
+ bps.getProperties(id, DoneGetProperties())
+ bps.getStatus(id, DoneGetStatus())
+ bps.getIDs(DoneGetIDs())
+ protocol.invokeLater(r3)
+ def r4():
+ bpsvc = c.getRemoteService(breakpoints.NAME)
+ class BPListener(breakpoints.BreakpointsListener):
+ def breakpointStatusChanged(self, id, status):
+ print "breakpointStatusChanged", id, status
+ def contextAdded(self, bps):
+ print "breakpointAdded", bps
+ bpsvc.removeListener(self)
+ def contextChanged(self, bps):
+ print "breakpointChanged", bps
+ def contextRemoved(self, ids):
+ print "breakpointRemoved", ids
+ bpsvc.addListener(BPListener())
+ class DoneSet(breakpoints.DoneCommand):
+ def doneCommand(self, token, error):
+ if error:
+ protocol.log("Error from Breakpoints.set", error)
+ return
+ bp = {
+ breakpoints.PROP_ID : "python:1",
+ breakpoints.PROP_ENABLED : True,
+ breakpoints.PROP_LOCATION : "sysClkRateGet"
+ }
+ bpsvc.set([bp], DoneSet())
+ protocol.invokeLater(r4)
+
+def testStackTrace(c, ctx_id):
+ from tcf.services import stacktrace
+ stack = c.getRemoteService(stacktrace.NAME)
+ class DoneGetChildren(stacktrace.DoneGetChildren):
+ def doneGetChildren(self, token, error, ctx_ids):
+ if error:
+ protocol.log("Error from StackTrace.getChildren", error)
+ return
+ class DoneGetContext(stacktrace.DoneGetContext):
+ def doneGetContext(self, token, error, ctxs):
+ if error:
+ protocol.log("Error from StackTrace.getContext", error)
+ return
+ if ctxs:
+ for ctx in ctxs:
+ print ctx.getProperties()
+ stack.getContext(ctx_ids, DoneGetContext())
+ stack.getChildren(ctx_id, DoneGetChildren())
+
+def testSymbols(c):
+ from tcf.services import symbols
+ def symTest(ctx_id):
+ syms = c.getRemoteService(symbols.NAME)
+ class DoneList(symbols.DoneList):
+ def doneList(self, token, error, ctx_ids):
+ if error:
+ protocol.log("Error from Symbols.list", error)
+ return
+ class DoneGetContext(symbols.DoneGetContext):
+ def doneGetContext(self, token, error, ctxs):
+ if error:
+ protocol.log("Error from Symbols.getContext", error)
+ return
+ if ctxs:
+ for ctx in ctxs:
+ print ctx.getProperties()
+ if ctx_ids:
+ for ctx_id in ctx_ids:
+ syms.getContext(ctx_id, DoneGetContext())
+ syms.list(ctx_id, DoneList())
+ for ctx_id in _suspended:
+ protocol.invokeLater(symTest, ctx_id)
+
+def testSyncCommands(c):
+ # simplified command execution
+ ctl = sync.CommandControl(c)
+ for ctx_id in _suspended:
+ print ctl.Symbols.list(ctx_id)
+ for ctx_id in _suspended:
+ frame_ids = ctl.StackTrace.getChildren(ctx_id).get()
+ if frame_ids:
+ error, args = ctl.StackTrace.getContext(frame_ids)
+ if not error: print args
+ def gotBreakpoints(error, bps):
+ print "Got breakpoint list:", bps
+ ctl.Breakpoints.getIDs(onDone=gotBreakpoints)
+ try:
+ print ctl.Processes.getChildren(None, False)
+ except:
+ pass # no Processes service
+
+def testEvents(c):
+ from tcf.util import event
+ recorder = event.EventRecorder(c)
+ recorder.record("RunControl")
+ ctl = sync.CommandControl(c)
+ rc = ctl.RunControl
+ ctx = rc.getChildren(None).get()[0]
+ rc.resume(ctx, 0, 1, None).wait()
+ print recorder
+ rc.suspend(ctx).wait()
+ print recorder
+ recorder.stop()
+
+def testDataCache(c):
+ from tcf.util import cache
+ from tcf.services import runcontrol
+ class ContextsCache(cache.DataCache):
+ def startDataRetrieval(self):
+ rc = self._channel.getRemoteService(runcontrol.NAME)
+ if not rc:
+ self.set(None, exceptions.Exception("No RunControl service"), None)
+ return
+ cache = self
+ pending = []
+ contexts = []
+ class DoneGetChildren(runcontrol.DoneGetChildren):
+ def doneGetChildren(self, token, error, context_ids):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.GetChildren", error)
+ else:
+ for c in context_ids:
+ contexts.append(c)
+ pending.append(rc.getChildren(c, self))
+ if len(pending) == 0:
+ cache.set(None, None, contexts)
+ pending.append(rc.getChildren(None, DoneGetChildren()))
+ contextsCache = ContextsCache(c)
+ def done():
+ print "ContextsCache is valid:", contextsCache.getData()
+ protocol.invokeLater(contextsCache.validate, done)
+
+if __name__ == '__main__':
+ test()
diff --git a/python/src/tcf/tests/__init__.py b/python/src/tcf/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python/src/tcf/tests/__init__.py diff --git a/python/src/tcf/transport.py b/python/src/tcf/transport.py new file mode 100644 index 000000000..7a54ce0f1 --- /dev/null +++ b/python/src/tcf/transport.py @@ -0,0 +1,155 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import threading, exceptions
+import protocol, channel
+from tcf.services import locator
+
+_channels = []
+_listeners = []
+_transports = {}
+_lock = threading.RLock()
+
+class TransportProvider(object):
+ """
+ TransportProvider represents communication protocol that can be used to open TCF communication channels.
+ Examples of transports are: TCP/IP, RS-232, USB.
+
+ Client can implement this interface if they want to provide support for a transport that is not
+ supported directly by the framework.
+ """
+
+ def getName(self):
+ """
+ Return transport name. Same as used as peer attribute, @see IPeer.ATTR_TRANSPORT_NAME
+ @return transport name.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def openChannel(self, peer):
+ """
+ Open channel to communicate with this peer using this transport.
+ Note: the channel can be not fully open yet when this method returns.
+ It's state can be IChannel.STATE_OPENING.
+ Protocol.Listener will be called when the channel will be opened or closed.
+ @param peer - a IPeer object that describes remote end-point of the channel.
+ @return TCF communication channel.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
+
+def addTransportProvider(transport):
+ name = transport.getName()
+ assert name
+ with _lock:
+ if _transports.get(name): raise exceptions.Exception("Already registered: " + name)
+ _transports[name] = transport
+
+def removeTransportProvider(transport):
+ name = transport.getName()
+ assert name
+ with _lock:
+ if _transports.get(name) == transport: del _transports[name]
+
+def openChannel(peer):
+ name = peer.getTransportName()
+ if not name: raise Exception("No transport name")
+ with _lock:
+ provider = _transports.get(name)
+ if not provider: raise Exception("Unknown transport name: " + name)
+ return provider.openChannel(peer)
+
+def channelOpened(channel):
+ assert channel not in _channels
+ _channels.append(channel)
+ for l in _listeners:
+ try:
+ l.onChannelOpen(channel)
+ except exceptions.Exception as x:
+ protocol.log("Exception in channel listener", x)
+
+def channelClosed(channel, error):
+ assert channel in _channels
+ _channels.remove(channel)
+
+def getOpenChannels():
+ return _channels[:]
+
+def addChannelOpenListener(listener):
+ assert listener
+ _listeners.append(listener)
+
+def removeChannelOpenListener(listener):
+ _listeners.remove(listener)
+
+class TCPTransportProvider(TransportProvider):
+ def getName(self):
+ return "TCP"
+ def openChannel(self, p):
+ assert self.getName() == p.getTransportName()
+ from tcf import peer
+ attrs = p.getAttributes()
+ host = attrs.get(peer.ATTR_IP_HOST)
+ port = attrs.get(peer.ATTR_IP_PORT)
+ if not host: raise exceptions.RuntimeError("No host name")
+ from channel.ChannelTCP import ChannelTCP
+ return ChannelTCP(p, host, _parsePort(port))
+
+def _parsePort(port):
+ if not port: raise exceptions.Exception("No port number")
+ try:
+ return int(port)
+ except exceptions.Exception:
+ raise exceptions.RuntimeError(
+ "Invalid value of \"Port\" attribute. Must be decimal number.")
+
+def sendEvent(service_name, event_name, data):
+ """
+ Transmit TCF event message.
+ The message is sent to all open communication channels - broadcasted.
+
+ This is internal API, TCF clients should use protocol.sendEvent().
+ """
+ for c in _channels:
+ # Skip channels that are executing "redirect" command - STATE_OPENING
+ if c.getState() == channel.STATE_OPEN:
+ s = c.getLocalService(service_name)
+ if s: c.sendEvent(s, event_name, data)
+
+def sync(done):
+ """
+ Call back after TCF messages sent by this host up to this moment are delivered
+ to their intended targets. This method is intended for synchronization of messages
+ across multiple channels.
+
+ Note: Cross channel synchronization can reduce performance and throughput.
+ Most clients don't need cross channel synchronization and should not call this method.
+
+ @param done will be executed by dispatch thread after communication
+ messages are delivered to corresponding targets.
+
+ This is internal API, TCF clients should use protocol.sync().
+ """
+ set = set()
+ class DoneSync(locator.DoneSync):
+ def doneSync(self, token):
+ assert set.contains(token)
+ set.remove(token)
+ if len(set) == 0: done()
+ done_sync = DoneSync()
+ for c in _channels:
+ if c.getState() == channel.STATE_OPEN:
+ s = c.getRemoteService(locator.NAME)
+ if s: set.append(s.sync(done_sync))
+ if len(set) == 0: protocol.invokeLater(done)
+
+# initialize TCP transport
+addTransportProvider(TCPTransportProvider())
+
diff --git a/python/src/tcf/util/__init__.py b/python/src/tcf/util/__init__.py new file mode 100644 index 000000000..cf36110a7 --- /dev/null +++ b/python/src/tcf/util/__init__.py @@ -0,0 +1,10 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
diff --git a/python/src/tcf/util/cache.py b/python/src/tcf/util/cache.py new file mode 100644 index 000000000..99aa13c12 --- /dev/null +++ b/python/src/tcf/util/cache.py @@ -0,0 +1,270 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import cStringIO, exceptions
+from tcf import protocol, channel
+
+class DataCache(object):
+ """
+ Objects of this class are used to cache TCF remote data.
+ The cache is asynchronous state machine. The states are:
+ 1. Valid - cache is in sync with remote data, use getError() and getData() to get cached data
+ 2. Invalid - cache is out of sync, start data retrieval by calling validate()
+ 3. Pending - cache is waiting result of a command that was sent to remote peer
+ 4. Disposed - cache is disposed and cannot be used to store data.
+
+ A cache instance can be created on any data type that needs to be cached.
+ Examples might be context children list, context properties, context state, memory data,
+ register data, symbol, variable, etc.
+ Clients of cache items can register for cache changes, but don't need to think about any particular events
+ since that is handled by the cache item itself.
+
+ A typical cache client should implement Runnable interface.
+ The implementation of run() method should:
+
+ Validate all cache items required for client task.
+ If anything is invalid then client should not alter any shared data structures,
+ should discard any intermediate results and register (wait) for changes of invalid cache instance(s) state.
+ When cache item state changes, client is invoked again and full validation is restarted.
+ Once everything is valid, client completes its task in a single dispatch cycle.
+
+ Note: clients should never retain copies of remote data across dispatch cycles!
+ Such data would get out of sync and compromise data consistency.
+ All remote data and everything derived from remote data should be kept in cache items
+ that implement proper event handling and can keep data consistent across dispatch cycles.
+ """
+
+ __error = None
+ __valid = None
+ __posted = False
+ __disposed = False
+ __data = None
+
+ def __init__(self, channel):
+ assert channel
+ self._channel = channel
+ self._command = None
+ self.__waiting_list = None
+
+ def post(self):
+ if self.__posted: return
+ if not self.__waiting_list: return
+ protocol.invokeLater(self)
+ self.__posted = True
+
+ def isValid(self):
+ """
+ @return True if cache contains up-to-date data or error.
+ """
+ return self.__valid
+
+ def isPending(self):
+ """
+ @return True if data retrieval command is in progress.
+ """
+ return self._command is not None
+
+ def isDisposed(self):
+ """
+ @return True if cache is disposed.
+ """
+ return self.__disposed
+
+ def getError(self):
+ """
+ @return error object if data retrieval ended with an error, or None if retrieval was successful.
+ Note: It is prohibited to call this method when cache is not valid.
+ """
+ assert self.__valid
+ return self.__error
+
+ def getData(self):
+ """
+ @return cached data object.
+ Note: It is prohibited to call this method when cache is not valid.
+ """
+ assert protocol.isDispatchThread()
+ assert self.__valid
+ return self.__data
+
+ def __call__(self):
+ """
+ Notify waiting clients about cache state change and remove them from wait list.
+ It is responsibility of clients to check if the state change was one they are waiting for.
+ Clients are not intended to call this method.
+ """
+ assert protocol.isDispatchThread()
+ self.__posted = False
+ if self.__waiting_list:
+ arr = self.__waiting_list
+ self.__waiting_list = None
+ for r in arr:
+ if isinstance(r, DataCache) and r._DataCache__posted: continue
+ r()
+ if self.__waiting_list is None: self.__waiting_list = arr
+
+ def wait(self, cb):
+ """
+ Add a client call-back to cache wait list.
+ Client call-backs are activated when cache state changes.
+ Call-backs are removed from waiting list after that.
+ It is responsibility of clients to check if the state change was one they are waiting for.
+ @param cb - a call-back object
+ """
+ assert protocol.isDispatchThread()
+ assert not self.__disposed
+ assert not self.__valid
+ if cb and not self.isWaiting(cb):
+ if self.__waiting_list is None: self.__waiting_list = []
+ self.__waiting_list.append(cb)
+
+ def isWaiting(self, cb):
+ """
+ Return True if a client call-back is waiting for state changes of this cache item.
+ @param cb - a call-back object.
+ @return True if 'cb' is in the wait list.
+ """
+ if not self.__waiting_list: return False
+ for r in self.__waiting_list:
+ if r is cb: return True
+ return False
+
+ def __validate(self):
+ """
+ Initiate data retrieval if the cache is not valid.
+ @return True if the cache is already valid
+ """
+ assert protocol.isDispatchThread()
+ if self.__disposed or self._channel.getState() != channel.STATE_OPEN:
+ self._command = None
+ self.__valid = True
+ self.__error = None
+ self.__data = None
+ else:
+ if self._command is not None: return False
+ if not self.__valid and not self.startDataRetrieval(): return False
+ assert self.__valid
+ assert self._command is None
+ self.post()
+ return True
+
+ def validate(self, cb=None):
+ """
+ If the cache is not valid, initiate data retrieval and
+ add a client call-back to cache wait list.
+ Client call-backs are activated when cache state changes.
+ Call-backs are removed from waiting list after that.
+ It is responsibility of clients to check if the state change was one they are waiting for.
+ If the cache is valid do nothing and return True.
+ @param cb - a call-back object (optional)
+ @return True if the cache is already valid
+ """
+ if not self.__validate():
+ if cb: self.wait(cb)
+ return False
+ return True
+
+ def start(self, command):
+ """
+ Start cache pending state.
+ Pending state is when the cache is waiting for a TCF command to return results.
+ @param command - TCF command handle.
+ """
+ assert not self.__valid
+ assert command
+ assert self._command is None
+ self._command = command
+
+ def done(self, command):
+ """
+ End cache pending state, but not mark the cache as valid.
+ @param command - TCF command handle.
+ """
+ if self._command is not command: return
+ assert not self.__valid
+ self._command = None
+ self.post()
+
+ def set(self, token, error, data):
+ """
+ End cache pending state and mark the cache as valid.
+ If 'token' != None, the data represent results from a completed command.
+ The data should be ignored if current cache pending command is not same as 'token'.
+ It can happen if the cache was reset or canceled during data retrieval.
+ @param token - pending command handle or None.
+ @param error - data retrieval error or None
+ @param data - up-to-date data object
+ """
+ assert protocol.isDispatchThread()
+ if token and self._command is not token: return
+ self._command = None
+ if not self.__disposed:
+ assert not self.__valid
+ if self._channel.getState() != channel.STATE_OPEN:
+ self.__error = None
+ self.__data = None
+ self.__error = error
+ self.__data = data
+ self.__valid = True
+ self.post()
+
+ def reset(self, data=None):
+ """
+ Force cache to become valid, cancel pending data retrieval if data is provided.
+ @param data - up-to-date data object (optional)
+ """
+ assert protocol.isDispatchThread()
+ if data is not None and self._command is not None:
+ self._command.cancel()
+ self._command = None
+ if not self.__disposed:
+ self.__data = data
+ self.__error = None
+ self.__valid = True
+ self.post()
+
+ def cancel(self):
+ """
+ Invalidate the cache.
+ Cancel pending data retrieval if any.
+ """
+ self.reset()
+ if self._command is not None:
+ self._command.cancel()
+ self._command = None
+
+ def dispose(self):
+ """
+ Dispose the cache.
+ Cancel pending data retrieval if any.
+ """
+ self.cancel()
+ self.__valid = True
+ self.__disposed = True
+
+ def __str__(self):
+ bf = cStringIO.StringIO()
+ bf.write('[')
+ if self.__valid: bf.append("valid,")
+ if self.__disposed: bf.write("disposed,")
+ if self.__posted: bf.write("posted,")
+ if self.__error is not None: bf.write("error,")
+ bf.write("data=")
+ bf.write(str(self.__data))
+ bf.write(']')
+ return bf.getvalue()
+
+ def startDataRetrieval(self):
+ """
+ Sub-classes should override this method to implement actual data retrieval logic.
+ @return True is all done, False if retrieval is in progress.
+ """
+ raise exceptions.NotImplementedError("Abstract method")
diff --git a/python/src/tcf/util/event.py b/python/src/tcf/util/event.py new file mode 100644 index 000000000..3be8d694e --- /dev/null +++ b/python/src/tcf/util/event.py @@ -0,0 +1,86 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import exceptions, threading
+from tcf import protocol, channel
+
+class DelegatingEventListener(channel.EventListener):
+ def __init__(self, callable):
+ self._callable = callable
+ def event(self, name, data):
+ try:
+ args = channel.fromJSONSequence(data)
+ self._callable(self.svc_name, name, *args)
+ except exceptions.Exception as x:
+ protocol.log("Error decoding event data", x)
+
+def _print_event(service, name, *args):
+ print "Event: %s.%s%s" % (service, name, tuple(args))
+
+def get_event_printer():
+ return DelegatingEventListener(_print_event)
+
+class EventRecorder(object):
+ def __init__(self, channel):
+ self._channel = channel
+ self._events = []
+ self._listeners = {}
+ self._lock = threading.RLock()
+ self._filter = None
+ def __del__(self):
+ if self._channel.state == channel.STATE_OPEN:
+ self.stop()
+ def record(self, service, enable=True):
+ with self._lock:
+ listener = self._listeners.get(service)
+ if listener:
+ if not enable:
+ protocol.invokeLater(self._channel.removeEventListener, service, listener)
+ elif enable:
+ recorder = self
+ class Listener(channel.EventListener):
+ def event(self, name, data):
+ e = Event(service, name, data)
+ recorder._event(e)
+ listener = Listener()
+ self._listeners[service] = listener
+ protocol.invokeLater(self._channel.addEventListener, service, listener)
+ self._recording = enable
+ def stop(self, service=None):
+ if service:
+ self.record(service, False)
+ else:
+ for service in self._listeners.keys():
+ self.record(service, False)
+ def get(self):
+ with self._lock:
+ events = self._events
+ self._events = []
+ return events
+ def _event(self, e):
+ with self._lock:
+ self._events.append(e)
+ def __str__(self):
+ events = self.get()
+ return "\n".join(map(str, events))
+ __repr__ = __str__
+
+class Event(object):
+ def __init__(self, service, name, data):
+ self.service = service
+ self.name = name
+ try:
+ self.args = channel.fromJSONSequence(data)
+ except exceptions.Exception as x:
+ protocol.log("Error decoding event data", x)
+ def __str__(self):
+ return "Event: %s.%s%s" % (self.service, self.name, tuple(self.args))
+ __repr__ = __str__
diff --git a/python/src/tcf/util/sync.py b/python/src/tcf/util/sync.py new file mode 100644 index 000000000..cb44b94d6 --- /dev/null +++ b/python/src/tcf/util/sync.py @@ -0,0 +1,214 @@ +# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# * http://www.eclipse.org/legal/epl-v10.html
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+
+import threading, exceptions, types
+from tcf import protocol
+from tcf.channel.Command import Command
+
+class DispatchWrapper(object):
+ "Simple wrapper for attribute access and invocation on TCF dispatch thread"
+ def __init__(self, inner):
+ self.inner = inner
+ def __getattr__(self, attr):
+ val = protocol.invokeAndWait(getattr, self.inner, attr)
+ if type(val) in (types.FunctionType, types.MethodType):
+ return DispatchWrapper(val)
+ return val
+ def __call__(self, *args, **kwargs):
+ return protocol.invokeAndWait(self.inner, *args, **kwargs)
+
+class CommandControl(object):
+ """Provides a simple interface to send commands to remote services
+ and receive results.
+
+ Usage:
+ > cmd = CommandControl(channel)
+ > cmd.<service>.<command>(<args>)
+
+ Examples:
+ # send command, but don't wait for result:
+ > cmd.RunControl.suspend("system")
+ # getE() returns the result and raises an exception in case of error response:
+ > result = cmd.RunControl.getChildren(None).getE()
+ # to get error and result at the same time, use this form:
+ > error, result = cmd.Breakpoints.getIDs()
+ """
+ def __init__(self, channel, onDone=None, interactive=False):
+ self._lock = threading.Condition()
+ self._channel = channel
+ self._onDone = onDone
+ self._interactive = interactive
+ self._queue = []
+ self._pending = {}
+ self._complete = []
+ def __getattr__(self, attr):
+ val = getattr(self._channel, attr, None)
+ if val:
+ if self._interactive and type(val) in (types.FunctionType, types.MethodType):
+ val = DispatchWrapper(val)
+ return val
+ services = protocol.invokeAndWait(self._channel.getRemoteServices)
+ if attr == "services":
+ return services
+ if attr in services:
+ return ServiceWrapper(self, attr)
+ raise exceptions.AttributeError("Unknown service: %s. Use one of %s" % (attr, services))
+ def invoke(self, service, command, *args, **kwargs):
+ cmd = None
+ if not protocol.isDispatchThread():
+ if not kwargs.get("async"):
+ cmd = protocol.invokeAndWait(self._invoke, service, command, *args, **kwargs)
+ if cmd and self._interactive:
+ return cmd.getE()
+ else:
+ with self._lock:
+ self._queue.append((service, command, args, kwargs))
+ if len(self._queue) == 1:
+ protocol.invokeLater(self._processQueue)
+ return
+ return cmd
+ def _invoke(self, service, command, *args, **kwargs):
+ cmdCtrl = self
+ class GenericCommand(Command):
+ _result = None
+ def done(self, error, args):
+ resultArgs = None
+ if not error and args:
+ # error result is usually in args[0]
+ if service == "StackTrace" and command == "getContext":
+ err = self.toError(args[1])
+ resultArgs = (args[0],)
+ else:
+ err = self.toError(args[0])
+ resultArgs = args[1:]
+ if err:
+ error = err
+ cmdCtrl._doneCommand(self.token, error, resultArgs)
+ def wait(self, timeout=None):
+ cmdCtrl._waitForCommand(self.token, timeout)
+ def cancel(self):
+ return protocol.invokeAndWait(self.token.cancel)
+ def getResult(self, wait=None):
+ if wait:
+ cmdCtrl._waitForCommand(self.token)
+ return self._result
+ def getE(self):
+ r = self.getResult(True)
+ if r.error:
+ raise r.error
+ return r.args
+ def get(self):
+ r = self.getResult(True)
+ return r.args
+ def getError(self):
+ r = self.getResult(True)
+ return r.error
+ def __str__(self):
+ if self._async:
+ return self.getCommandString()
+ return str(self.get())
+ def __iter__(self):
+ return iter(self.getResult(True))
+ cmd = GenericCommand(self._channel, service, command, args)
+ cmd._async = kwargs.get("async")
+ cmd._onDone = kwargs.get("onDone")
+ self._addPending(cmd)
+ return cmd
+ def _processQueue(self):
+ assert protocol.isDispatchThread()
+ with self._lock:
+ for cmd in self._queue:
+ service, command, args, kwargs = cmd
+ self._invoke(service, command, *args, **kwargs)
+ del self._queue[:]
+ def _addPending(self, cmd):
+ with self._lock:
+ self._pending[cmd.token.id] = cmd
+ self._lock.notifyAll()
+ def _doneCommand(self, token, error, args):
+ with self._lock:
+ cmd = self._pending.get(token.id)
+ assert cmd
+ del self._pending[token.id]
+ cmd._result = CommandResult(token, error, args)
+ if cmd._async: self._complete.append(cmd)
+ isDone = self.isDone()
+ if isDone: self._lock.notifyAll()
+ if cmd._onDone: cmd._onDone(error, *args)
+ if isDone and self._onDone: self._onDone()
+ def isDone(self):
+ with self._lock:
+ return not self._pending and not self._queue
+ def wait(self, timeout=None):
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while self._pending or self._queue:
+ self._lock.wait(timeout)
+ if timeout: break
+ def _waitForCommand(self, token, timeout=None):
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while self._pending.has_key(token.id):
+ self._lock.wait(timeout)
+ if timeout: break
+ else:
+ if self._queue:
+ self._lock.wait(timeout)
+ while self._pending.has_key(token.id):
+ self._lock.wait(timeout)
+ if timeout: break
+ def cancel(self):
+ if not protocol.isDispatchThread():
+ protocol.invokeLater(self.cancel)
+ return
+ with self._lock:
+ for cmd in self._pending.values():
+ cmd.token.cancel()
+ del self._queue[:]
+ def getResult(self, wait=True):
+ if wait:
+ self.wait()
+ with self._lock:
+ result = map(lambda c: c.getResult(), self._complete)
+ del self._complete[:]
+ return result
+
+class CommandResult(object):
+ def __init__(self, token, error, args):
+ self.token = token
+ self.error = error
+ # unwrap result if only one element
+ if args and len(args) == 1:
+ args = args[0]
+ self.args = args
+ def __str__(self):
+ if self.error:
+ return "[%s] error: %s" % (self.token.id, self.error)
+ return "[%s] result: %s" % (self.token.id, self.args)
+ __repr__ = __str__
+ def __iter__(self):
+ yield self.error
+ yield self.args
+
+class ServiceWrapper(object):
+ def __init__(self, control, service):
+ self._control = control
+ self._service = service
+ def __getattr__(self, attr):
+ return CommandWrapper(self._control, self._service, attr)
+
+class CommandWrapper(object):
+ def __init__(self, control, service, command):
+ self._control = control
+ self._service = service
+ self._command = command
+ def __call__(self, *args, **kwargs):
+ return self._control.invoke(self._service, self._command, *args, **kwargs)
diff --git a/python/src/tcf/util/task.py b/python/src/tcf/util/task.py new file mode 100644 index 000000000..e4230c9dc --- /dev/null +++ b/python/src/tcf/util/task.py @@ -0,0 +1,204 @@ +#******************************************************************************
+# Copyright (c) 2011 Wind River Systems, Inc. and others.
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Public License v1.0
+# which accompanies this distribution, and is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Contributors:
+# Wind River Systems - initial API and implementation
+#******************************************************************************
+
+import threading, exceptions
+from tcf import protocol
+
+class Task(object):
+ """
+ A <tt>Task</tt> is an utility class that represents the result of an asynchronous
+ communication over TCF framework. Methods are provided to check if the communication is
+ complete, to wait for its completion, and to retrieve the result of
+ the communication.
+
+ Task is useful when communication is requested by a thread other then TCF dispatch thread.
+ If client has a global state, for example, cached remote data, multithreading should be avoided,
+ because it is extremely difficult to ensure absence of racing conditions or deadlocks in such environment.
+ Such clients should consider message driven design, see DataCache and its usage as an example.
+
+ If a client is extending Task it should implement run() method to perform actual communications.
+ The run() method will be execute by TCF dispatch thread, and client code should then call either done() or
+ error() to indicate that task computations are complete.
+ """
+ __result = None
+ __is_done = False
+ __error = None
+ __canceled = False
+ __channel = None
+
+ def __init__(self, target=None, *args, **kwargs):
+ """
+ Construct a TCF task object and schedule it for execution.
+ """
+ if target:
+ kwargs["done"] = self.__done
+ else:
+ target = self.run
+
+ self._target = target
+ self._args = args
+ self._kwargs = kwargs
+ self._lock = threading.Condition()
+ protocol.invokeLater(self.__doRun)
+ timeout = kwargs.get("timeout")
+ if timeout:
+ protocol.invokeLaterWithDelay(timeout, self.cancel)
+
+# """
+# Construct a TCF task object and schedule it for execution.
+# The task will be __canceled if the given __channel is closed or
+# terminated while the task is in progress.
+# @param __channel
+# """
+# public Task(final IChannel __channel) {
+# Protocol.invokeLater(new Runnable() {
+# public void run() {
+# try {
+# if (__channel.getState() != IChannel.STATE_OPEN) throw new Exception("Channel is closed")
+# Task.this.__channel = __channel
+# channel_listener = new IChannel.IChannelListener() {
+#
+# public void congestionLevel(int level) {
+# }
+#
+# public void onChannelClosed(final Throwable __error) {
+# cancel(true)
+# }
+#
+# public void onChannelOpened() {
+# }
+# }
+# __channel.addChannelListener(channel_listener)
+# Task.this.run()
+# }
+# catch (Throwable x) {
+# if (!__is_done && __error == null) error(x)
+# }
+# }
+# })
+# }
+
+ def __doRun(self):
+ try:
+ self._target(*self._args, **self._kwargs)
+ except exceptions.Exception as x:
+ if not self.__is_done and self.__error is None:
+ self.error(x)
+
+ def __done(self, error, result):
+ if error:
+ self.error(error)
+ else:
+ self.done(result)
+
+ def run(self, *args, **kwargs):
+ raise exceptions.NotImplementedError("Abstract method")
+
+ def done(self, result):
+ with self._lock:
+ assert protocol.isDispatchThread()
+ if self.__canceled: return
+ assert not self.__is_done
+ assert not self.__error
+ assert self.__result is None
+ self.__result = result
+ self.__is_done = True
+ if self.__channel:
+ self.__channel.removeChannelListener(self.channel_listener)
+ self._lock.notifyAll()
+
+ def error(self, error):
+ """
+ Set a __error and notify all threads waiting for the task to complete.
+ The method is supposed to be called in response to executing of run() method of this task.
+
+ @param __error - computation __error.
+ """
+ assert protocol.isDispatchThread()
+ assert error
+ with self._lock:
+ if self.__canceled: return
+ assert self.__error is None
+ assert self.__result is None
+ assert not self.__is_done
+ self.__error = error
+ if self.__channel:
+ self.__channel.removeChannelListener(self.channel_listener)
+ self._lock.notifyAll()
+
+ def cancel(self):
+ assert protocol.isDispatchThread()
+ with self._lock:
+ if self.isDone(): return False
+ self.__canceled = True
+ self.__error = exceptions.Exception("Canceled")
+ if self.__channel:
+ self.__channel.removeChannelListener(self.channel_listener)
+ self._lock.notifyAll()
+ return True
+
+ def get(self, timeout=None):
+ """
+ Waits if necessary for the computation to complete, and then
+ retrieves its result.
+
+ @return the computed result
+ @throws CancellationException if the computation was __canceled
+ @throws ExecutionException if the computation threw an
+ exception
+ @throws InterruptedException if the current thread was interrupted
+ while waiting
+ """
+ assert not protocol.isDispatchThread()
+ with self._lock:
+ while not self.isDone():
+ self._lock.wait(timeout)
+ if timeout and not self.isDone():
+ raise TimeoutException("Timed out")
+ if self.__error:
+ raise exceptions.Exception("TCF task aborted", self.__error)
+ return self.__result
+
+ def isCancelled(self):
+ """
+ Returns <tt>true</tt> if this task was __canceled before it completed
+ normally.
+
+ @return <tt>true</tt> if task was __canceled before it completed
+ """
+ with self._lock:
+ return self.__canceled
+
+ def isDone(self):
+ """
+ Returns <tt>true</tt> if this task completed.
+
+ Completion may be due to normal termination, an exception, or
+ cancellation -- in all of these cases, this method will return
+ <tt>true</tt>.
+
+ @return <tt>true</tt> if this task completed.
+ """
+ with self._lock:
+ return self.__error or self.__is_done
+
+ def getError(self):
+ """
+ Return task execution __error if any.
+ @return Throwable object or null
+ """
+ return self.__error
+
+ def getResult(self):
+ return self.__result
+
+class TimeoutException(exceptions.Exception):
+ pass
|