Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/channel/AbstractChannel.py')
-rw-r--r--python/src/tcf/channel/AbstractChannel.py324
1 files changed, 210 insertions, 114 deletions
diff --git a/python/src/tcf/channel/AbstractChannel.py b/python/src/tcf/channel/AbstractChannel.py
index f313cc252..e700af122 100644
--- a/python/src/tcf/channel/AbstractChannel.py
+++ b/python/src/tcf/channel/AbstractChannel.py
@@ -1,5 +1,5 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# *****************************************************************************
+# * Copyright (c) 2011, 2013 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -7,16 +7,20 @@
# *
# * Contributors:
# * Wind River Systems - initial API and implementation
-# *******************************************************************************
+# *****************************************************************************
-import sys, threading, time, types
-from tcf import protocol, transport, services, peer, errors
-from tcf.services import locator
-from tcf.channel import STATE_CLOSED, STATE_OPEN, STATE_OPENING
-from tcf.channel import Token, fromJSONSequence, toJSONSequence
+import sys
+import threading
+import time
+import types
-EOS = -1 # End Of Stream
-EOM = -2 # End Of Message
+from .. import protocol, transport, services, peer, errors
+from ..services import locator
+from ..channel import STATE_CLOSED, STATE_OPEN, STATE_OPENING
+from ..channel import Token, fromJSONSequence, toJSONSequence
+
+EOS = -1 # End Of Stream
+EOM = -2 # End Of Message
class Message(object):
@@ -31,9 +35,11 @@ class Message(object):
self.is_sent = None
self.token = None
self.trace = ()
+
def __str__(self):
return "%s %s %s" % (self.type, self.service, self.name)
+
class ReaderThread(threading.Thread):
def __init__(self, channel, handleInput):
super(ReaderThread, self).__init__(name="TCF Reader Thread")
@@ -52,9 +58,13 @@ class ReaderThread(threading.Thread):
while True:
n = self.channel.read()
if n <= 0:
- if n == end: break
- if n == EOM: raise IOError("Unexpected end of message")
- if n < 0: raise IOError("Communication channel is closed by remote peer")
+ if n == end:
+ break
+ if n == EOM:
+ raise IOError("Unexpected end of message")
+ if n < 0:
+ raise IOError("Communication channel is closed by " + \
+ "remote peer")
buf.append(n)
return buf
@@ -66,18 +76,21 @@ class ReaderThread(threading.Thread):
try:
while True:
n = self.channel.read()
- if n == EOM: continue
+ if n == EOM:
+ continue
if n == EOS:
try:
self.eos_err_report = self.readBytes(EOM)
reportLen = len(self.eos_err_report)
- if reportLen == 0 or reportLen == 1 and self.eos_err_report[0] == 0:
+ if reportLen == 0 or reportLen == 1 and \
+ self.eos_err_report[0] == 0:
self.eos_err_report = None
except:
pass
break
msg = Message(n)
- if self.channel.read() != 0: self.error()
+ if self.channel.read() != 0:
+ self.error()
typeCode = msg.type
if typeCode == 'C':
msg.token = Token(self.readBytes(0))
@@ -97,7 +110,8 @@ class ReaderThread(threading.Thread):
self.error()
protocol.invokeLater(self.handleInput, msg)
delay = self.channel.local_congestion_level
- if delay > 0: time.sleep(delay / 1000.0)
+ if delay > 0:
+ time.sleep(delay / 1000.0)
protocol.invokeLater(self.handleEOS)
except Exception as x:
try:
@@ -124,18 +138,22 @@ class ReaderThread(threading.Thread):
class AbstractChannel(object):
"""
- AbstractChannel implements communication link connecting two end points (peers).
- The channel asynchronously transmits messages: commands, results and events.
+ AbstractChannel implements communication link connecting two end points
+ (peers).
+ The channel asynchronously transmits messages: commands, results and
+ events.
- Clients can subclass AbstractChannel to support particular transport (wire) protocol.
+ Clients can subclass AbstractChannel to support particular transport (wire)
+ protocol.
Also, see StreamChannel for stream oriented transport protocols.
"""
def __init__(self, remote_peer, local_peer=None):
self.remote_peer = remote_peer
- self.local_peer = local_peer # TODO
+ self.local_peer = local_peer # TODO
self.inp_thread = ReaderThread(self, self.__handleInput)
- self.out_thread = threading.Thread(target=self.__write_output,name="TCF Channel Transmitter")
+ self.out_thread = threading.Thread(target=self.__write_output,
+ name="TCF Channel Transmitter")
self.out_thread.daemon = True
self.out_tokens = {}
self.out_queue = []
@@ -171,10 +189,12 @@ class AbstractChannel(object):
while len(self.out_queue) == 0:
self.out_lock.wait()
msg = self.out_queue.pop(0)
- if not msg: break
+ if not msg:
+ break
last = len(self.out_queue) == 0
if msg.is_canceled:
- if last: self.flush()
+ if last:
+ self.flush()
continue
msg.is_sent = True
if msg.trace:
@@ -195,10 +215,13 @@ class AbstractChannel(object):
self.write(EOM)
delay = 0
level = self.remote_congestion_level
- if level > 0: delay = level * 10
- if last or delay > 0: self.flush()
- if delay > 0: time.sleep(delay / 1000.0)
- #else yield()
+ if level > 0:
+ delay = level * 10
+ if last or delay > 0:
+ self.flush()
+ if delay > 0:
+ time.sleep(delay / 1000.0)
+ # else yield()
self.write(EOS)
self.write(EOM)
self.flush()
@@ -212,10 +235,10 @@ class AbstractChannel(object):
def __traceMessageSent(self, m):
for l in m.trace:
try:
- id = None
+ tokenID = None
if m.token is not None:
- id = m.token.getID()
- l.onMessageSent(m.type, id, m.service, m.name, m.data)
+ tokenID = m.token.getID()
+ l.onMessageSent(m.type, tokenID, m.service, m.name, m.data)
except Exception as x:
protocol.log("Exception in channel listener", x)
@@ -227,34 +250,42 @@ class AbstractChannel(object):
def __initServices(self):
try:
- if self.proxy: return
- if self.state == STATE_CLOSED: return
+ if self.proxy:
+ return
+ if self.state == STATE_CLOSED:
+ return
services.onChannelCreated(self, self.local_service_by_name)
- self.__makeServiceByClassMap(self.local_service_by_name, self.local_service_by_class)
+ self.__makeServiceByClassMap(self.local_service_by_name,
+ self.local_service_by_class)
args = self.local_service_by_name.keys()
- self.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((args,)))
+ self.sendEvent(protocol.getLocator(), "Hello",
+ toJSONSequence((args,)))
except IOError as x:
self.terminate(x)
def redirect_id(self, peer_id):
"""
- Redirect this channel to given peer using this channel remote peer locator service as a proxy.
- @param peer_id - peer that will become new remote communication endpoint of this channel
+ Redirect this channel to given peer using this channel remote peer
+ locator service as a proxy.
+ @param peer_id - peer that will become new remote communication
+ endpoint of this channel
"""
- map = {}
- map[peer.ATTR_ID] = peer_id
+ peerMap = {}
+ peerMap[peer.ATTR_ID] = peer_id
self.redirect(map)
def redirect(self, peer_attrs):
"""
- Redirect this channel to given peer using this channel remote peer locator service as a proxy.
- @param peer_attrs - peer that will become new remote communication endpoint of this channel
+ Redirect this channel to given peer using this channel remote peer
+ locator service as a proxy.
+ @param peer_attrs - peer that will become new remote communication
+ endpoint of this channel
"""
if isinstance(peer_attrs, str):
# support for redirect(peerId)
- map = {}
- map[peer.ATTR_ID] = peer_attrs
- peer_attrs = map
+ attrs = {}
+ attrs[peer.ATTR_ID] = peer_attrs
+ peer_attrs = attrs
channel = self
assert protocol.isDispatchThread()
if self.state == STATE_OPENING:
@@ -264,20 +295,28 @@ class AbstractChannel(object):
assert self.redirect_command is None
try:
l = self.remote_service_by_class.get(locator.LocatorService)
- if not l: raise IOError("Cannot redirect channel: peer " +
- self.remote_peer.getID() + " has no locator service")
+ if not l:
+ raise IOError("Cannot redirect channel: peer " +
+ self.remote_peer.getID() +
+ " has no locator service")
peer_id = peer_attrs.get(peer.ATTR_ID)
if peer_id and len(peer_attrs) == 1:
_peer = l.getPeers().get(peer_id)
if not _peer:
- # Peer not found, must wait for a while until peer is discovered or time out
+ # Peer not found, must wait for a while until peer is
+ # discovered or time out
class Callback(object):
found = None
+
def __call__(self):
- if self.found: return
- channel.terminate(Exception("Peer " + peer_id + " not found"))
+ if self.found:
+ return
+ channel.terminate(Exception("Peer " + peer_id +
+ " not found"))
cb = Callback()
- protocol.invokeLaterWithDelay(locator.DATA_RETENTION_PERIOD / 3, cb)
+ delay = locator.DATA_RETENTION_PERIOD / 3
+ protocol.invokeLaterWithDelay(delay, cb)
+
class Listener(locator.LocatorListener):
def peerAdded(self, new_peer):
if new_peer.getID() == peer_id:
@@ -291,33 +330,42 @@ class AbstractChannel(object):
def doneRedirect(self, token, exc):
assert channel.redirect_command is token
channel.redirect_command = None
- if channel.state != STATE_OPENING: return
- if exc: channel.terminate(exc)
+ if channel.state != STATE_OPENING:
+ return
+ if exc:
+ channel.terminate(exc)
channel.remote_peer = _peer
channel.remote_service_by_class.clear()
channel.remote_service_by_name.clear()
channel.event_listeners.clear()
- self.redirect_command = l.redirect(peer_id, DoneRedirect())
+ self.redirect_command = l.redirect(peer_id,
+ DoneRedirect())
else:
class TransientPeer(peer.TransientPeer):
def __init__(self, peer_attrs, parent):
super(TransientPeer, self).__init__(peer_attrs)
self.parent = parent
+
def openChannel(self):
c = self.parent.openChannel()
c.redirect(peer_attrs)
+
class DoneRedirect(locator.DoneRedirect):
def doneRedirect(self, token, exc):
assert channel.redirect_command is token
channel.redirect_command = None
- if channel.state != STATE_OPENING: return
- if exc: channel.terminate(exc)
+ if channel.state != STATE_OPENING:
+ return
+ if exc:
+ channel.terminate(exc)
parent = channel.remote_peer
- channel.remote_peer = TransientPeer(peer_attrs, parent)
+ channel.remote_peer = TransientPeer(peer_attrs,
+ parent)
channel.remote_service_by_class.clear()
channel.remote_service_by_name.clear()
channel.event_listeners.clear()
- self.redirect_command = l.redirect(peer_attrs, DoneRedirect())
+ self.redirect_command = l.redirect(peer_attrs,
+ DoneRedirect())
self.state = STATE_OPENING
except Exception as x:
self.terminate(x)
@@ -325,7 +373,8 @@ class AbstractChannel(object):
def __makeServiceByClassMap(self, by_name, by_class):
for service in by_name.values():
for clazz in service.__class__.__bases__:
- if clazz == services.Service: continue
+ if clazz == services.Service:
+ continue
# TODO
# if (!IService.class.isAssignableFrom(fs)) continue
by_class[clazz] = service
@@ -352,27 +401,29 @@ class AbstractChannel(object):
def removeTraceListener(self, listener):
self.trace_listeners = self.trace_listeners[:]
self.trace_listeners.remove(listener)
- if len(self.trace_listeners) == 0: self.trace_listeners = None
+ if len(self.trace_listeners) == 0:
+ self.trace_listeners = None
def addEventListener(self, service, listener):
assert protocol.isDispatchThread()
svc_name = str(service)
listener.svc_name = svc_name
- list = self.event_listeners.get(svc_name) or []
- list.append(listener)
- self.event_listeners[svc_name] = list
+ lst = self.event_listeners.get(svc_name) or []
+ lst.append(listener)
+ self.event_listeners[svc_name] = lst
def removeEventListener(self, service, listener):
assert protocol.isDispatchThread()
svc_name = str(service)
- list = self.event_listeners.get(svc_name)
- if not list: return
- for i in range(len(list)):
- if list[i] is listener:
- if len(list) == 1:
+ lst = self.event_listeners.get(svc_name)
+ if not lst:
+ return
+ for i in range(len(lst)):
+ if lst[i] is listener:
+ if len(lst) == 1:
del self.event_listeners[svc_name]
else:
- del list[i]
+ del lst[i]
return
def addCommandServer(self, service, listener):
@@ -391,7 +442,8 @@ class AbstractChannel(object):
def close(self):
assert protocol.isDispatchThread()
- if self.state == STATE_CLOSED: return
+ if self.state == STATE_CLOSED:
+ return
try:
self.__sendEndOfStream(10000)
self._close(None)
@@ -400,11 +452,13 @@ class AbstractChannel(object):
def terminate(self, error):
assert protocol.isDispatchThread()
- if self.state == STATE_CLOSED: return
+ if self.state == STATE_CLOSED:
+ return
try:
self.__sendEndOfStream(500)
except Exception as x:
- if not error: error = x
+ if not error:
+ error = x
self._close(error)
def __sendEndOfStream(self, timeout):
@@ -433,17 +487,22 @@ class AbstractChannel(object):
except Exception as x:
protocol.log("Exception in channel listener", x)
channel = self
+
class Runnable(object):
def __call__(self):
if channel.out_tokens:
x = None
- if isinstance(error, Exception): x = error
- elif error: x = Exception(error)
- else: x = IOError("Channel is closed")
+ if isinstance(error, Exception):
+ x = error
+ elif error:
+ x = Exception(error)
+ else:
+ x = IOError("Channel is closed")
for msg in channel.out_tokens.values():
try:
s = str(msg)
- if len(s) > 72: s = s[:72] + "...]"
+ if len(s) > 72:
+ s = s[:72] + "...]"
y = IOError("Command " + s + " aborted")
# y.initCause(x)
msg.token.getListener().terminated(msg.token, y)
@@ -452,7 +511,8 @@ class AbstractChannel(object):
channel.out_tokens.clear()
if channel.channel_listeners:
for l in channel.channel_listeners:
- if not l: break
+ if not l:
+ break
try:
l.onChannelClosed(error)
except Exception as x:
@@ -470,8 +530,10 @@ class AbstractChannel(object):
def getCongestion(self):
assert protocol.isDispatchThread()
level = len(self.out_tokens) * 100 / self.pending_command_limit - 100
- if self.remote_congestion_level > level: level = self.remote_congestion_level
- if level > 100: level = 100
+ if self.remote_congestion_level > level:
+ level = self.remote_congestion_level
+ if level > 100:
+ level = 100
return level
def getLocalPeer(self):
@@ -509,15 +571,21 @@ class AbstractChannel(object):
return self.remote_service_by_class.get(cls_or_name)
def setServiceProxy(self, service_interface, service_proxy):
- if not self.notifying_channel_opened: raise Exception("setServiceProxe() can be called only from channel open call-back")
- if not isinstance(self.remote_service_by_name.get(service_proxy.getName()), services.GenericProxy): raise Exception("Proxy already set")
- if self.remote_service_by_class.get(service_interface): raise Exception("Proxy already set")
+ if not self.notifying_channel_opened:
+ raise Exception("setServiceProxe() can be called only from " +
+ "channel open call-back")
+ proxy = self.remote_service_by_name.get(service_proxy.getName())
+ if not isinstance(proxy, services.GenericProxy):
+ raise Exception("Proxy already set")
+ if self.remote_service_by_class.get(service_interface):
+ raise Exception("Proxy already set")
self.remote_service_by_class[service_interface] = service_proxy
self.remote_service_by_name[service_proxy.getName()] = service_proxy
def setProxy(self, proxy, services):
self.proxy = proxy
- self.sendEvent(protocol.getLocator(), "Hello", toJSONSequence((services,)))
+ self.sendEvent(protocol.getLocator(), "Hello",
+ toJSONSequence((services,)))
self.local_service_by_class.clear()
self.local_service_by_name.clear()
@@ -529,21 +597,27 @@ class AbstractChannel(object):
def sendCommand(self, service, name, args, listener):
assert protocol.isDispatchThread()
- if self.state == STATE_OPENING: raise Exception("Channel is waiting for Hello message")
- if self.state == STATE_CLOSED: raise Exception("Channel is closed")
+ if self.state == STATE_OPENING:
+ raise Exception("Channel is waiting for Hello message")
+ if self.state == STATE_CLOSED:
+ raise Exception("Channel is closed")
msg = Message('C')
msg.service = str(service)
msg.name = name
msg.data = args
channel = self
+
class CancelableToken(Token):
def __init__(self, listener):
super(CancelableToken, self).__init__(listener=listener)
+
def cancel(self):
assert protocol.isDispatchThread()
- if channel.state != STATE_OPEN: return False
+ if channel.state != STATE_OPEN:
+ return False
with channel.out_lock:
- if msg.is_sent: return False
+ if msg.is_sent:
+ return False
msg.is_canceled = True
del channel.out_tokens[msg.token.getID()]
return True
@@ -555,7 +629,8 @@ class AbstractChannel(object):
def sendProgress(self, token, results):
assert protocol.isDispatchThread()
- if self.state != STATE_OPEN: raise Exception("Channel is closed")
+ if self.state != STATE_OPEN:
+ raise Exception("Channel is closed")
msg = Message('P')
msg.data = results
msg.token = token
@@ -563,7 +638,8 @@ class AbstractChannel(object):
def sendResult(self, token, results):
assert protocol.isDispatchThread()
- if self.state != STATE_OPEN: raise Exception("Channel is closed")
+ if self.state != STATE_OPEN:
+ raise Exception("Channel is closed")
msg = Message('R')
msg.data = results
msg.token = token
@@ -571,14 +647,16 @@ class AbstractChannel(object):
def rejectCommand(self, token):
assert protocol.isDispatchThread()
- if self.state != STATE_OPEN: raise Exception("Channel is closed")
+ if self.state != STATE_OPEN:
+ raise Exception("Channel is closed")
msg = Message('N')
msg.token = token
self.addToOutQueue(msg)
def sendEvent(self, service, name, args):
assert protocol.isDispatchThread()
- if not (self.state == STATE_OPEN or self.state == STATE_OPENING and isinstance(service, locator.LocatorService)):
+ if not (self.state == STATE_OPEN or self.state == STATE_OPENING and
+ isinstance(service, locator.LocatorService)):
raise Exception("Channel is closed")
msg = Message('E')
msg.service = str(service)
@@ -592,16 +670,18 @@ class AbstractChannel(object):
def __traceMessageReceived(self, m):
for l in self.trace_listeners:
try:
- id = None
+ messageID = None
if m.token is not None:
- id = m.token.getID()
- l.onMessageReceived(m.type, id, m.service, m.name, m.data)
+ messageID = m.token.getID()
+ l.onMessageReceived(m.type, messageID, m.service, m.name,
+ m.data)
except Exception as x:
protocol.log("Exception in channel listener", x)
def __handleInput(self, msg):
assert protocol.isDispatchThread()
- if self.state == STATE_CLOSED: return
+ if self.state == STATE_CLOSED:
+ return
if self.trace_listeners:
self.__traceMessageReceived(msg)
try:
@@ -610,15 +690,18 @@ class AbstractChannel(object):
if typeCode in 'PRN':
token_id = msg.token.getID()
cmd = self.out_tokens.get(token_id)
- if cmd is None: raise Exception("Invalid token received: " + token_id)
+ if cmd is None:
+ raise Exception("Invalid token received: " + token_id)
if typeCode != 'P':
del self.out_tokens[token_id]
token = cmd.token
if typeCode == 'C':
if self.state == STATE_OPENING:
- raise IOError("Received command " + msg.service + "." + msg.name + " before Hello message")
+ raise IOError("Received command " + msg.service + "." +
+ msg.name + " before Hello message")
if self.proxy:
- self.proxy.onCommand(msg.token, msg.service, msg.name, msg.data)
+ self.proxy.onCommand(msg.token, msg.service, msg.name,
+ msg.data)
else:
token = msg.token
cmds = self.command_servers.get(msg.service)
@@ -633,16 +716,19 @@ class AbstractChannel(object):
token.getListener().result(token, msg.data)
self.__sendCongestionLevel()
elif typeCode == 'N':
- token.getListener().terminated(token, errors.ErrorReport(
- "Command is not recognized", errors.TCF_ERROR_INV_COMMAND))
+ report = errors.ErrorReport("Command is not recognized",
+ errors.TCF_ERROR_INV_COMMAND)
+ token.getListener().terminated(token, report)
elif typeCode == 'E':
hello = msg.service == locator.NAME and msg.name == "Hello"
if hello:
self.remote_service_by_name.clear()
self.remote_service_by_class.clear()
data = fromJSONSequence(msg.data)[0]
- services.onChannelOpened(self, data, self.remote_service_by_name)
- self.__makeServiceByClassMap(self.remote_service_by_name, self.remote_service_by_class)
+ services.onChannelOpened(self, data,
+ self.remote_service_by_name)
+ self.__makeServiceByClassMap(self.remote_service_by_name,
+ self.remote_service_by_class)
self.zero_copy = "ZeroCopy" in self.remote_service_by_name
if self.proxy and self.state == STATE_OPEN:
self.proxy.onEvent(msg.service, msg.name, msg.data)
@@ -658,21 +744,24 @@ class AbstractChannel(object):
transport.channelOpened(self)
self.registered_with_trasport = True
for l in self.channel_listeners:
- if not l: break
+ if not l:
+ break
try:
l.onChannelOpened()
except Exception as x:
- protocol.log("Exception in channel listener", x)
+ protocol.log("Exception in channel listener",
+ x)
self.notifying_channel_opened = False
else:
- list = self.event_listeners.get(msg.service)
- if list:
- for l in list:
+ lst = self.event_listeners.get(msg.service)
+ if lst:
+ for l in lst:
l.event(msg.name, msg.data)
self.__sendCongestionLevel()
elif typeCode == 'F':
length = len(msg.data)
- if length > 0 and msg.data[length - 1] == '\0': length -= 1
+ if length > 0 and msg.data[length - 1] == '\0':
+ length -= 1
self.remote_congestion_level = int(msg.data)
else:
assert False
@@ -682,16 +771,21 @@ class AbstractChannel(object):
def __sendCongestionLevel(self):
self.local_congestion_cnt += 1
- if self.local_congestion_cnt < 8: return
+ if self.local_congestion_cnt < 8:
+ return
self.local_congestion_cnt = 0
- if self.state != STATE_OPEN: return
+ if self.state != STATE_OPEN:
+ return
timeVal = int(time.time() * 1000)
- if timeVal - self.local_congestion_time < 500: return
+ if timeVal - self.local_congestion_time < 500:
+ return
assert protocol.isDispatchThread()
level = protocol.getCongestionLevel()
- if level == self.local_congestion_level: return
+ if level == self.local_congestion_level:
+ return
i = (level - self.local_congestion_level) / 8
- if i != 0: level = self.local_congestion_level + i
+ if i != 0:
+ level = self.local_congestion_level + i
self.local_congestion_time = timeVal
with self.out_lock:
msg = None
@@ -721,7 +815,8 @@ class AbstractChannel(object):
The method argument can be one of two special values:
EOS (-1) end of stream marker
EOM (-2) end of message marker.
- The stream can put the byte into a buffer instead of transmitting it right away.
+ The stream can put the byte into a buffer instead of transmitting it
+ right away.
@param n - the data byte.
@raises IOError
"""
@@ -747,7 +842,8 @@ class AbstractChannel(object):
def write(self, buf):
"""
Write array of bytes into the channel output stream.
- The stream can put bytes into a buffer instead of transmitting it right away.
+ The stream can put bytes into a buffer instead of transmitting it right
+ away.
@param buf
@raises IOError
"""

Back to the top