Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFrederic Leger2013-01-16 03:34:18 -0500
committerAnton Leherbauer2013-01-16 03:34:18 -0500
commit7997d52dc1d0e5a24de734a80016c65329cc9149 (patch)
treefe155ca75d2b22b3789737c82e696f45d61807fd /python/src/tcf/protocol.py
parent61435e0455753f81c20f42b9eef51f925da293ba (diff)
downloadorg.eclipse.tcf-7997d52dc1d0e5a24de734a80016c65329cc9149.tar.gz
org.eclipse.tcf-7997d52dc1d0e5a24de734a80016c65329cc9149.tar.xz
org.eclipse.tcf-7997d52dc1d0e5a24de734a80016c65329cc9149.zip
TCF Python: Bug 397827 - Use relative imports inside tcf package modules
Diffstat (limited to 'python/src/tcf/protocol.py')
-rw-r--r--python/src/tcf/protocol.py171
1 files changed, 115 insertions, 56 deletions
diff --git a/python/src/tcf/protocol.py b/python/src/tcf/protocol.py
index 1f94ff406..2fd45e634 100644
--- a/python/src/tcf/protocol.py
+++ b/python/src/tcf/protocol.py
@@ -1,5 +1,5 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# *****************************************************************************
+# * Copyright (c) 2011, 2013 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -7,44 +7,55 @@
# *
# * Contributors:
# * Wind River Systems - initial API and implementation
-# *******************************************************************************
+# *****************************************************************************
"""
-Module protocol provides static methods to access Target Communication Framework root objects:
+Module protocol provides static methods to access Target Communication
+Framework root objects:
+
1. the framework event queue and dispatch thread
-2. local instance of Locator service, which maintains a list of available targets
+2. local instance of Locator service, which maintains a list of available
+ targets
3. list of open communication channels.
It also provides utility methods for posting asynchronous events,
including delayed events (timers).
"""
-import sys, uuid, threading, time
-from EventQueue import EventQueue
+import sys
+import threading
+import time
+import uuid
+
+from . import EventQueue
_event_queue = None
_timer_dispatcher = None
+
def startEventQueue():
global _event_queue, _timer_dispatcher
- if _event_queue and not _event_queue.isShutdown(): return
- _event_queue = EventQueue(on_shutdown=shutdownDiscovery)
+ if _event_queue and not _event_queue.isShutdown():
+ return
+ _event_queue = EventQueue.EventQueue(on_shutdown=shutdownDiscovery)
_event_queue.start()
# initialize LocatorService
from services.local.LocatorService import LocatorService
_event_queue.invokeLater(LocatorService)
# start timer dispatcher
- _timer_dispatcher = threading.Thread(target = _dispatch_timers)
+ _timer_dispatcher = threading.Thread(target=_dispatch_timers)
_timer_dispatcher.setName("TCF Timer Dispatcher")
_timer_dispatcher.setDaemon(True)
_timer_dispatcher.start()
+
def getEventQueue():
"""
@return instance of event queue that is used for TCF events.
"""
return _event_queue
+
def isDispatchThread():
"""
Returns true if the calling thread is the TCF event dispatch thread.
@@ -55,7 +66,8 @@ def isDispatchThread():
"""
return _event_queue is not None and _event_queue.isDispatchThread()
-def invokeLater(callable, *args, **kwargs):
+
+def invokeLater(c, *args, **kwargs):
"""
Causes callable to be called with given arguments
in the dispatch thread of the framework.
@@ -66,11 +78,12 @@ def invokeLater(callable, *args, **kwargs):
This method can be invoked from any thread.
- @param callable the callable to be executed asynchronously
+ @param c - the callable to be executed asynchronously
"""
- _event_queue.invokeLater(callable, *args, **kwargs)
+ _event_queue.invokeLater(c, *args, **kwargs)
+
-def invokeLaterWithDelay(delay, callable, *args, **kwargs):
+def invokeLaterWithDelay(delay, c, *args, **kwargs):
"""
Causes callable event to called in the dispatch thread of the framework.
The event is dispatched after given delay.
@@ -80,17 +93,19 @@ def invokeLaterWithDelay(delay, callable, *args, **kwargs):
@param delay milliseconds to delay event dispatch.
If delay <= 0 the event is posted into the
"ready" queue without delay.
- @param callable the callable to be executed asynchronously.
+ @param c - the callable to be executed asynchronously.
"""
if delay <= 0:
- _event_queue.invokeLater(callable, *args, **kwargs)
+ _event_queue.invokeLater(c, *args, **kwargs)
else:
with _timer_queue_lock:
- _timer_queue.append(Timer(time.time() + delay/1000., callable, *args, **kwargs))
+ _timer_queue.append(Timer(time.time() + delay / 1000., c, *args,
+ **kwargs))
_timer_queue.sort()
_timer_queue_lock.notify()
-def invokeAndWait(callable, *args, **kwargs):
+
+def invokeAndWait(c, *args, **kwargs):
"""
Causes callable to be called in the dispatch thread of the framework.
Calling thread is suspended until the method is executed.
@@ -99,16 +114,17 @@ def invokeAndWait(callable, *args, **kwargs):
This method can be invoked from any thread.
- @param callable the callable to be executed on dispatch thread.
+ @param c the callable to be executed on dispatch thread.
"""
if _event_queue.isDispatchThread():
- return callable(*args, **kwargs)
+ return c(*args, **kwargs)
else:
class DoRun():
result = None
+
def __call__(self):
try:
- self.result = callable(*args, **kwargs)
+ self.result = c(*args, **kwargs)
finally:
with runLock:
runLock.notify()
@@ -120,10 +136,14 @@ def invokeAndWait(callable, *args, **kwargs):
return doRun.result
_agentID = str(uuid.uuid4())
+
+
def getAgentID():
return _agentID
_logger = None
+
+
def setLogger(logger):
"""
Set framework logger.
@@ -134,6 +154,7 @@ def setLogger(logger):
global _logger
_logger = logger
+
def log(msg, x=None):
"""
Logs the given message.
@@ -143,22 +164,23 @@ def log(msg, x=None):
@param x - an exception associated with the log entry or None.
"""
if not _logger:
- print>>sys.stderr, msg
+ print >> sys.stderr, msg
while x:
import traceback
- print>>sys.stderr, "%s: %s" % (type(x).__name__, x)
+ print >> sys.stderr, "%s: %s" % (type(x).__name__, x)
tb = getattr(x, "tb", None) or sys.exc_info()[2]
if tb:
traceback.print_tb(tb)
caused_by = getattr(x, "caused_by", None)
if caused_by:
- print>>sys.stderr, "Caused by:"
+ print >> sys.stderr, "Caused by:"
x = caused_by
else:
break
else:
_logger.log(msg, x)
+
def startDiscovery():
"Start discovery of remote peers if not running yet"
# initialize LocatorService
@@ -166,21 +188,24 @@ def startDiscovery():
if LocatorService.locator:
invokeAndWait(LocatorService.startup)
+
def shutdownDiscovery():
"Shutdown discovery if running"
- from tcf.services.local.LocatorService import LocatorService
+ from .services.local.LocatorService import LocatorService
if LocatorService.locator:
invokeAndWait(LocatorService.shutdown)
-
+
+
def getLocator():
"""
Get instance of the framework locator service.
The service can be used to discover available remote peers.
@return instance of LocatorService.
"""
- from tcf.services.local.LocatorService import LocatorService
+ from .services.local.LocatorService import LocatorService
return LocatorService.locator
+
def getOpenChannels():
"""
Return an array of all open channels.
@@ -190,20 +215,22 @@ def getOpenChannels():
import transport
return transport.getOpenChannels()
+
class ChannelOpenListener(object):
"""
Interface to be implemented by clients willing to be notified when
new TCF communication channel is opened.
- The interface allows a client to get pointers to channel objects
- that were opened by somebody else. If a client open a channel itself, it already has
- the pointer and does not need protocol.ChannelOpenListener. If a channel is created,
- for example, by remote peer connecting to the client, the only way to get the pointer
- is protocol.ChannelOpenListener.
+ The interface allows a client to get pointers to channel objects that were
+ opened by somebody else. If a client open a channel itself, it already has
+ the pointer and does not need protocol.ChannelOpenListener. If a channel is
+ created, for example, by remote peer connecting to the client, the only way
+ to get the pointer is protocol.ChannelOpenListener.
"""
def onChannelOpen(self, channel):
pass
+
def addChannelOpenListener(listener):
"""
Add a listener that will be notified when new channel is opened.
@@ -213,6 +240,7 @@ def addChannelOpenListener(listener):
import transport
transport.addChannelOpenListener(listener)
+
def removeChannelOpenListener(listener):
"""
Remove channel opening listener.
@@ -222,6 +250,7 @@ def removeChannelOpenListener(listener):
import transport
transport.removeChannelOpenListener(listener)
+
def sendEvent(service_name, event_name, data):
"""
Transmit TCF event message.
@@ -231,42 +260,54 @@ def sendEvent(service_name, event_name, data):
import transport
transport.sendEvent(service_name, event_name, data)
+
def sync(done):
"""
- Call back after all TCF messages sent by this host up to this moment are delivered
- to their intended target. This method is intended for synchronization of messages
- across multiple channels.
+ Call back after all TCF messages sent by this host up to this moment are
+ delivered to their intended target. This method is intended for
+ synchronization of messages across multiple channels.
Note: Cross channel synchronization can reduce performance and throughput.
- Most clients don't need cross channel synchronization and should not call this method.
+ Most clients don't need cross channel synchronization and should not call
+ this method.
- @param done will be executed by dispatch thread after pending communication
- messages are delivered to corresponding targets.
+ @param done - will be executed by dispatch thread after pending
+ communication messages are delivered to corresponding
+ targets.
"""
assert isDispatchThread()
import transport
transport.sync(done)
+
class CongestionMonitor(object):
"""
- Clients implement CongestionMonitor interface to monitor usage of local resources,
- like, for example, display queue size - if the queue becomes too big, UI response time
- can become too high, or it can crash all together because of OutOfMemory errors.
- TCF flow control logic prevents such conditions by throttling traffic coming from remote peers.
- Note: Local (in-bound traffic) congestion is detected by framework and reported to
- remote peer without client needed to be involved. Only clients willing to provide
- additional data about local congestion should implement CongestionMonitor and
- register it using protocol.addCongestionMonitor().
+ Clients implement CongestionMonitor interface to monitor usage of local
+ resources, like, for example, display queue size - if the queue becomes too
+ big, UI response time can become too high, or it can crash all together
+ because of OutOfMemory errors.
+
+ TCF flow control logic prevents such conditions by throttling traffic
+ coming from remote peers.
+
+ Note: Local (in-bound traffic) congestion is detected by framework and
+ reported to remote peer without client needed to be involved. Only
+ clients willing to provide additional data about local congestion
+ should implement CongestionMonitor and register it using
+ protocol.addCongestionMonitor().
"""
def getCongestionLevel(self):
"""
Get current level of client resource utilization.
- @return integer value in range -100..100, where -100 means all resources are free,
- 0 means optimal load, and positive numbers indicate level of congestion.
+ @return integer value in range -100..100, where -100 means all
+ resources are free, 0 means optimal load, and positive numbers
+ indicate level of congestion.
"""
raise NotImplementedError("Abstract method")
_congestion_monitors = []
+
+
def addCongestionMonitor(monitor):
"""
Register a congestion monitor.
@@ -275,6 +316,7 @@ def addCongestionMonitor(monitor):
assert isDispatchThread()
_congestion_monitors.add(monitor)
+
def removeCongestionMonitor(monitor):
"""
Unregister a congestion monitor.
@@ -283,6 +325,7 @@ def removeCongestionMonitor(monitor):
assert isDispatchThread()
_congestion_monitors.remove(monitor)
+
def getCongestionLevel():
"""
Get current level of local traffic congestion.
@@ -295,13 +338,17 @@ def getCongestionLevel():
level = -100
for m in _congestion_monitors:
n = m.getCongestionLevel()
- if n > level: level = n
+ if n > level:
+ level = n
if _event_queue:
n = _event_queue.getCongestion()
- if n > level: level = n
- if level > 100: level = 100
+ if n > level:
+ level = n
+ if level > 100:
+ level = 100
return level
+
def addServiceProvider(provider):
"""
Register service provider.
@@ -311,6 +358,7 @@ def addServiceProvider(provider):
import services
services.addServiceProvider(provider)
+
def removeServiceProvider(provider):
"""
Unregister service provider.
@@ -320,6 +368,7 @@ def removeServiceProvider(provider):
import services
services.removeServiceProvider(provider)
+
def addTransportProvider(provider):
"""
Register transport provider.
@@ -329,6 +378,7 @@ def addTransportProvider(provider):
import transport
transport.addTransportProvider(provider)
+
def removeTransportProvider(provider):
"""
Unregister transport provider.
@@ -338,8 +388,10 @@ def removeTransportProvider(provider):
import transport
transport.removeTransportProvider(provider)
+
class Timer(object):
timer_cnt = 0
+
def __init__(self, time, run, *args, **kwargs):
self.id = Timer.timer_cnt
Timer.timer_cnt += 1
@@ -349,16 +401,23 @@ class Timer(object):
self.kwargs = kwargs
def __cmp__(self, x):
- if x is self: return 0
- if self.time < x.time: return -1
- if self.time > x.time: return +1
- if self.id < x.id: return -1
- if self.id > x.id: return +1
+ if x is self:
+ return 0
+ if self.time < x.time:
+ return -1
+ if self.time > x.time:
+ return +1
+ if self.id < x.id:
+ return -1
+ if self.id > x.id:
+ return +1
assert False
return 0
_timer_queue_lock = threading.Condition()
_timer_queue = []
+
+
def _dispatch_timers():
try:
with _timer_queue_lock:

Back to the top