diff options
author | aleherbau | 2011-03-31 14:43:18 +0000 |
---|---|---|
committer | aleherbau | 2011-03-31 14:43:18 +0000 |
commit | 60783280bd63e742b2b70cdbdff8d93758ffe820 (patch) | |
tree | f056f0359bb04fc28ab3590b3dd412d4278b31b3 /python/src/tcf/protocol.py | |
parent | ab638aab968e71abfb8fde370b25c211434a2acd (diff) | |
download | org.eclipse.tcf-60783280bd63e742b2b70cdbdff8d93758ffe820.tar.gz org.eclipse.tcf-60783280bd63e742b2b70cdbdff8d93758ffe820.tar.xz org.eclipse.tcf-60783280bd63e742b2b70cdbdff8d93758ffe820.zip |
Initial commit of TCF python scripting support.
Diffstat (limited to 'python/src/tcf/protocol.py')
-rw-r--r-- | python/src/tcf/protocol.py | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py new file mode 100644 index 000000000..2a00c31ff --- /dev/null +++ b/python/src/tcf/protocol.py @@ -0,0 +1,312 @@ +# ******************************************************************************* +# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# * All rights reserved. This program and the accompanying materials +# * are made available under the terms of the Eclipse Public License v1.0 +# * which accompanies this distribution, and is available at +# * http://www.eclipse.org/legal/epl-v10.html +# * +# * Contributors: +# * Wind River Systems - initial API and implementation +# ******************************************************************************* + +""" +Module protocol provides static methods to access Target Communication Framework root objects: +1. the framework event queue and dispatch thread +2. local instance of Locator service, which maintains a list of available targets +3. list of open communication channels. + +It also provides utility methods for posting asynchronous events, +including delayed events (timers). +""" + +import sys, uuid, threading, exceptions +from EventQueue import EventQueue + +_event_queue = None +def startEventQueue(): + global _event_queue + if _event_queue and not _event_queue.isShutdown(): return + _event_queue = EventQueue() + _event_queue.start() + +def getEventQueue(): + """ + @return instance of event queue that is used for TCF events. + """ + return _event_queue + +def isDispatchThread(): + """ + Returns true if the calling thread is the TCF event dispatch thread. + Use this call to ensure that a given task is being executed (or not being) + on dispatch thread. + + @return true if running on the dispatch thread. + """ + return _event_queue is not None and _event_queue.isDispatchThread() + +def invokeLater(callable, *args, **kwargs): + """ + Causes callable to be called with given arguments + in the dispatch thread of the framework. + Events are dispatched in same order as queued. + If invokeLater is called from the dispatching thread + the callable will still be deferred until + all pending events have been processed. + + This method can be invoked from any thread. + + @param callable the callable to be executed asynchronously + """ + _event_queue.invokeLater(callable, *args, **kwargs) + +def invokeLaterWithDelay(delay, callable, *args, **kwargs): + """ + Causes callable event to called in the dispatch thread of the framework. + The event is dispatched after given delay. + + This method can be invoked from any thread. + + @param delay milliseconds to delay event dispatch. + If delay <= 0 the event is posted into the + "ready" queue without delay. + @param callable the callable to be executed asynchronously. + """ + if delay <= 0: + _event_queue.invokeLater(callable, *args, **kwargs) + else: + # TODO timer_queue + raise exceptions.NotImplementedError("Implement invokeLaterWithDelay") +# synchronized (timer_queue) { +# timer_queue.add(new Timer(System.currentTimeMillis() + delay, runnable)) +# timer_queue.notify() + +def invokeAndWait(callable, *args, **kwargs): + """ + Causes callable to be called in the dispatch thread of the framework. + Calling thread is suspended until the method is executed. + If invokeAndWait is called from the dispatching thread + the callable is executed immediately. + + This method can be invoked from any thread. + + @param runnable the callable to be executed on dispatch thread. + """ + if _event_queue.isDispatchThread(): + return callable(*args, **kwargs) + else: + class DoRun(): + result = None + def __call__(self): + try: + self.result = callable(*args, **kwargs) + finally: + with runLock: + runLock.notify() + doRun = DoRun() + runLock = threading.Condition() + with runLock: + _event_queue.invokeLater(doRun) + runLock.wait() + return doRun.result + +_agentID = str(uuid.uuid4()) +def getAgentID(): + return _agentID + +_logger = None +def setLogger(logger): + """ + Set framework logger. + By default sys.stderr is used. + + @param logger - an object implementing Logger interface. + """ + global _logger + _logger = logger + +def log(msg, x=None): + """ + Logs the given message. + @see #setLogger + This method can be invoked from any thread. + @param msg - log entry text + @param x - an exception associated with the log entry or null. + """ + if not _logger: + print>>sys.stderr, msg + while x: + import traceback + print>>sys.stderr, "%s: %s" % (type(x).__name__, x) + tb = getattr(x, "tb", None) or sys.exc_info()[2] + if tb: + traceback.print_tb(tb) + caused_by = getattr(x, "caused_by", None) + if caused_by: + print>>sys.stderr, "Caused by:" + x = caused_by + else: + break + else: + _logger.log(msg, x) + +def getLocator(): + from services import locator + return locator.getLocator() + +def getOpenChannels(): + """ + Return an array of all open channels. + @return an array of IChannel + """ + assert isDispatchThread() + import transport + return transport.getOpenChannels() + +class ChannelOpenListener(object): + """ + Interface to be implemented by clients willing to be notified when + new TCF communication channel is opened. + + The interface allows a client to get pointers to channel objects + that were opened by somebody else. If a client open a channel itself, it already has + the pointer and does not need Protocol.ChannelOpenListener. If a channel is created, + for example, by remote peer connecting to the client, the only way to get the pointer + is Protocol.ChannelOpenListener. + """ + def onChannelOpen(self, channel): + pass + +def addChannelOpenListener(listener): + """ + Add a listener that will be notified when new channel is opened. + @param listener + """ + assert isDispatchThread() + import transport + transport.addChannelOpenListener(listener) + +def removeChannelOpenListener(listener): + """ + Remove channel opening listener. + @param listener + """ + assert isDispatchThread() + import transport + transport.removeChannelOpenListener(listener) + +def sendEvent(service_name, event_name, data): + """ + Transmit TCF event message. + The message is sent to all open communication channels - broadcasted. + """ + assert isDispatchThread() + import transport + transport.sendEvent(service_name, event_name, data) + +def sync(done): + """ + Call back after all TCF messages sent by this host up to this moment are delivered + to their intended target. This method is intended for synchronization of messages + across multiple channels. + + Note: Cross channel synchronization can reduce performance and throughput. + Most clients don't need cross channel synchronization and should not call this method. + + @param done will be executed by dispatch thread after pending communication + messages are delivered to corresponding targets. + """ + assert isDispatchThread() + import transport + transport.sync(done) + +class CongestionMonitor(object): + """ + Clients implement CongestionMonitor interface to monitor usage of local resources, + like, for example, display queue size - if the queue becomes too big, UI response time + can become too high, or it can crash all together because of OutOfMemory errors. + TCF flow control logic prevents such conditions by throttling traffic coming from remote peers. + Note: Local (in-bound traffic) congestion is detected by framework and reported to + remote peer without client needed to be involved. Only clients willing to provide + additional data about local congestion should implement CongestionMonitor and + register it using Protocol.addCongestionMonitor(). + """ + def getCongestionLevel(self): + """ + Get current level of client resource utilization. + @return integer value in range -100..100, where -100 means all resources are free, + 0 means optimal load, and positive numbers indicate level of congestion. + """ + raise exceptions.NotImplementedError("Abstract method") + +_congestion_monitors = [] +def addCongestionMonitor(monitor): + """ + Register a congestion monitor. + @param monitor - client implementation of CongestionMonitor interface + """ + assert isDispatchThread() + _congestion_monitors.add(monitor) + +def removeCongestionMonitor(monitor): + """ + Unregister a congestion monitor. + @param monitor - client implementation of CongestionMonitor interface + """ + assert isDispatchThread() + _congestion_monitors.remove(monitor) + +def getCongestionLevel(): + """ + Get current level of local traffic congestion. + + @return integer value in range -100..100, where -100 means no pending + messages (no traffic), 0 means optimal load, and positive numbers + indicate level of congestion. + """ + assert isDispatchThread() + level = -100 + for m in _congestion_monitors: + n = m.getCongestionLevel() + if n > level: level = n + if _event_queue: + n = _event_queue.getCongestion() + if n > level: level = n + if level > 100: level = 100 + return level + +def addServiceProvider(provider): + """ + Register service provider. + This method can be invoked from any thread. + @param provider - ServiceProvider implementation + """ + import services + services.addServiceProvider(provider) + +def removeServiceProvider(provider): + """ + Unregister service provider. + This method can be invoked from any thread. + @param provider - ServiceProvider implementation + """ + import services + services.removeServiceProvider(provider) + +def addTransportProvider(provider): + """ + Register transport provider. + This method can be invoked from any thread. + @param provider - TransportProvider implementation + """ + import transport + transport.addTransportProvider(provider) + +def removeTransportProvider(provider): + """ + Unregister transport provider. + This method can be invoked from any thread. + @param provider - TransportProvider implementation + """ + import transport + transport.removeTransportProvider(provider) |