Skip to main content
diff options
Diffstat (limited to 'python/src/tcf/tests/')
1 files changed, 292 insertions, 0 deletions
diff --git a/python/src/tcf/tests/ b/python/src/tcf/tests/
new file mode 100644
index 000000000..b4da8fbac
--- /dev/null
+++ b/python/src/tcf/tests/
@@ -0,0 +1,292 @@
+# *******************************************************************************
+# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# * All rights reserved. This program and the accompanying materials
+# * are made available under the terms of the Eclipse Public License v1.0
+# * which accompanies this distribution, and is available at
+# *
+# *
+# * Contributors:
+# * Wind River Systems - initial API and implementation
+# *******************************************************************************
+import sys, time, threading, exceptions
+import tcf
+from tcf import protocol, channel
+from tcf.util import sync
+class TraceListener(channel.TraceListener):
+ def onMessageReceived(self, type, token, service, name, data):
+ print "<<<", type, token, service, name, data
+ def onMessageSent(self, type, token, service, name, data):
+ print ">>>", type, token, service, name, data
+ def onChannelClosed(self, error):
+ print>>sys.stderr, "*** closed ***", error
+_suspended = []
+def test():
+ protocol.startEventQueue()
+ c = tcf.connect("TCP:")
+ assert c.state == channel.STATE_OPEN
+ #protocol.invokeLater(c.addTraceListener, TraceListener())
+ def r2():
+ print "services=", c.getRemoteServices()
+ protocol.invokeLater(r2)
+ testRunControl(c)
+ testBreakpoints(c)
+ testSymbols(c)
+ testSyncCommands(c)
+ testEvents(c)
+ testDataCache(c)
+ if c.state == channel.STATE_OPEN:
+ time.sleep(10)
+ protocol.invokeLater(c.close)
+ time.sleep(5)
+def testRunControl(c):
+ lock = threading.Condition()
+ from import runcontrol
+ def r3():
+ rctrl = c.getRemoteService(runcontrol.NAME)
+ pending = []
+ class DoneGetContext(runcontrol.DoneGetContext):
+ def doneGetContext(self, token, error, context):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.getContext", error)
+ else:
+ print "ID: ", context.getID()
+ print "Name: ", context.getName()
+ print "Parent: ", context.getParentID()
+ print "IsContainer: ", context.isContainer()
+ class DoneGetState(runcontrol.DoneGetState):
+ def doneGetState(self, token, error, suspended, pc, reason, params):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.getState", error)
+ else:
+ print "suspended: ", suspended
+ print "pc: ", pc
+ print "reason: ", reason
+ print "params: ", params
+ if suspended:
+ _suspended.append(context.getID())
+ testStackTrace(c, context.getID())
+ if len(pending) == 0:
+ with lock:
+ lock.notify()
+ if context and context.hasState(): pending.append(context.getState(DoneGetState()))
+ if len(pending) == 0:
+ with lock:
+ lock.notify()
+ class DoneGetChildren(runcontrol.DoneGetChildren):
+ def doneGetChildren(self, token, error, context_ids):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.GetChildren", error)
+ else:
+ for c in context_ids:
+ pending.append(rctrl.getContext(c, DoneGetContext()))
+ pending.append(rctrl.getChildren(c, self))
+ if len(pending) == 0:
+ with lock:
+ lock.notify()
+ pending.append(rctrl.getChildren(None, DoneGetChildren()))
+ with lock:
+ protocol.invokeLater(r3)
+ lock.wait(5000)
+ def r4():
+ rc = c.getRemoteService(runcontrol.NAME)
+ class RCListener(runcontrol.RunControlListener):
+ def contextSuspended(self, *args):
+ print "context suspended: ", args
+ rc.removeListener(self)
+ def contextResumed(self, *args):
+ print "context resumed: ", args
+ def containerSuspended(self, *args):
+ print "container suspended:", args
+ rc.removeListener(self)
+ def containerResumed(self, *args):
+ print "container resumed:", args
+ rc.addListener(RCListener())
+ class DoneGetContext(runcontrol.DoneGetContext):
+ def doneGetContext(self, token, error, context):
+ if error:
+ protocol.log("Error from RunControl.getContext", error)
+ with lock: lock.notify()
+ return
+ class DoneResume(runcontrol.DoneCommand):
+ def doneCommand(self, token, error):
+ if error:
+ protocol.log("Error from RunControl.resume", error)
+ else:
+ context.suspend(runcontrol.DoneCommand())
+ with lock: lock.notify()
+ context.resume(runcontrol.RM_RESUME, 1, None, DoneResume())
+ rc.getContext(_suspended[0], DoneGetContext())
+ if _suspended:
+ with lock:
+ protocol.invokeLater(r4)
+ lock.wait(5000)
+def testBreakpoints(c):
+ from import breakpoints
+ def r3():
+ bps = c.getRemoteService(breakpoints.NAME)
+ class DoneGetIDs(breakpoints.DoneGetIDs):
+ def doneGetIDs(self, token, error, ids):
+ if error:
+ protocol.log("Error from Breakpoints.getIDs", error)
+ return
+ print "Breakpoints :", ids
+ class DoneGetProperties(breakpoints.DoneGetProperties):
+ def doneGetProperties(self, token, error, props):
+ if error:
+ protocol.log("Error from Breakpoints.getProperties", error)
+ return
+ print "Breakpoint Properties: ", props
+ class DoneGetStatus(breakpoints.DoneGetStatus):
+ def doneGetProperties(self, token, error, props):
+ if error:
+ protocol.log("Error from Breakpoints.getStatus", error)
+ return
+ print "Breakpoint Status: ", props
+ for id in ids:
+ bps.getProperties(id, DoneGetProperties())
+ bps.getStatus(id, DoneGetStatus())
+ bps.getIDs(DoneGetIDs())
+ protocol.invokeLater(r3)
+ def r4():
+ bpsvc = c.getRemoteService(breakpoints.NAME)
+ class BPListener(breakpoints.BreakpointsListener):
+ def breakpointStatusChanged(self, id, status):
+ print "breakpointStatusChanged", id, status
+ def contextAdded(self, bps):
+ print "breakpointAdded", bps
+ bpsvc.removeListener(self)
+ def contextChanged(self, bps):
+ print "breakpointChanged", bps
+ def contextRemoved(self, ids):
+ print "breakpointRemoved", ids
+ bpsvc.addListener(BPListener())
+ class DoneSet(breakpoints.DoneCommand):
+ def doneCommand(self, token, error):
+ if error:
+ protocol.log("Error from Breakpoints.set", error)
+ return
+ bp = {
+ breakpoints.PROP_ID : "python:1",
+ breakpoints.PROP_ENABLED : True,
+ breakpoints.PROP_LOCATION : "sysClkRateGet"
+ }
+ bpsvc.set([bp], DoneSet())
+ protocol.invokeLater(r4)
+def testStackTrace(c, ctx_id):
+ from import stacktrace
+ stack = c.getRemoteService(stacktrace.NAME)
+ class DoneGetChildren(stacktrace.DoneGetChildren):
+ def doneGetChildren(self, token, error, ctx_ids):
+ if error:
+ protocol.log("Error from StackTrace.getChildren", error)
+ return
+ class DoneGetContext(stacktrace.DoneGetContext):
+ def doneGetContext(self, token, error, ctxs):
+ if error:
+ protocol.log("Error from StackTrace.getContext", error)
+ return
+ if ctxs:
+ for ctx in ctxs:
+ print ctx.getProperties()
+ stack.getContext(ctx_ids, DoneGetContext())
+ stack.getChildren(ctx_id, DoneGetChildren())
+def testSymbols(c):
+ from import symbols
+ def symTest(ctx_id):
+ syms = c.getRemoteService(symbols.NAME)
+ class DoneList(symbols.DoneList):
+ def doneList(self, token, error, ctx_ids):
+ if error:
+ protocol.log("Error from Symbols.list", error)
+ return
+ class DoneGetContext(symbols.DoneGetContext):
+ def doneGetContext(self, token, error, ctxs):
+ if error:
+ protocol.log("Error from Symbols.getContext", error)
+ return
+ if ctxs:
+ for ctx in ctxs:
+ print ctx.getProperties()
+ if ctx_ids:
+ for ctx_id in ctx_ids:
+ syms.getContext(ctx_id, DoneGetContext())
+ syms.list(ctx_id, DoneList())
+ for ctx_id in _suspended:
+ protocol.invokeLater(symTest, ctx_id)
+def testSyncCommands(c):
+ # simplified command execution
+ ctl = sync.CommandControl(c)
+ for ctx_id in _suspended:
+ print ctl.Symbols.list(ctx_id)
+ for ctx_id in _suspended:
+ frame_ids = ctl.StackTrace.getChildren(ctx_id).get()
+ if frame_ids:
+ error, args = ctl.StackTrace.getContext(frame_ids)
+ if not error: print args
+ def gotBreakpoints(error, bps):
+ print "Got breakpoint list:", bps
+ ctl.Breakpoints.getIDs(onDone=gotBreakpoints)
+ try:
+ print ctl.Processes.getChildren(None, False)
+ except:
+ pass # no Processes service
+def testEvents(c):
+ from tcf.util import event
+ recorder = event.EventRecorder(c)
+ recorder.record("RunControl")
+ ctl = sync.CommandControl(c)
+ rc = ctl.RunControl
+ ctx = rc.getChildren(None).get()[0]
+ rc.resume(ctx, 0, 1, None).wait()
+ print recorder
+ rc.suspend(ctx).wait()
+ print recorder
+ recorder.stop()
+def testDataCache(c):
+ from tcf.util import cache
+ from import runcontrol
+ class ContextsCache(cache.DataCache):
+ def startDataRetrieval(self):
+ rc = self._channel.getRemoteService(runcontrol.NAME)
+ if not rc:
+ self.set(None, exceptions.Exception("No RunControl service"), None)
+ return
+ cache = self
+ pending = []
+ contexts = []
+ class DoneGetChildren(runcontrol.DoneGetChildren):
+ def doneGetChildren(self, token, error, context_ids):
+ pending.remove(token)
+ if error:
+ protocol.log("Error from RunControl.GetChildren", error)
+ else:
+ for c in context_ids:
+ contexts.append(c)
+ pending.append(rc.getChildren(c, self))
+ if len(pending) == 0:
+ cache.set(None, None, contexts)
+ pending.append(rc.getChildren(None, DoneGetChildren()))
+ contextsCache = ContextsCache(c)
+ def done():
+ print "ContextsCache is valid:", contextsCache.getData()
+ protocol.invokeLater(contextsCache.validate, done)
+if __name__ == '__main__':
+ test()

Back to the top