Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'python/src/tcf/services/streams.py')
-rw-r--r--python/src/tcf/services/streams.py100
1 files changed, 67 insertions, 33 deletions
diff --git a/python/src/tcf/services/streams.py b/python/src/tcf/services/streams.py
index 446941b23..6e92054e2 100644
--- a/python/src/tcf/services/streams.py
+++ b/python/src/tcf/services/streams.py
@@ -1,5 +1,5 @@
-# *******************************************************************************
-# * Copyright (c) 2011 Wind River Systems, Inc. and others.
+# *****************************************************************************
+# * Copyright (c) 2011, 2012 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
@@ -7,32 +7,40 @@
# *
# * Contributors:
# * Wind River Systems - initial API and implementation
-# *******************************************************************************
+# *****************************************************************************
"""
-Streams service is a generic interface to support streaming of data between host and remote agents.
+Streams service is a generic interface to support streaming of data between
+host and remote agents.
The service supports:
- 1. Asynchronous overlapped data streaming: multiple 'read' or 'write' command can be issued at same time, both peers
- can continue data processing concurrently with data transmission.
+ 1. Asynchronous overlapped data streaming: multiple 'read' or 'write' command
+ can be issued at same time, both peers can continue data processing
+ concurrently with data transmission.
2. Multicast: multiple clients can receive data from same stream.
- 3. Subscription model: clients are required to expressed interest in particular streams by subscribing for the service.
- 4. Flow control: peers can throttle data flow of individual streams by delaying 'read' and 'write' commands.
+ 3. Subscription model: clients are required to expressed interest in
+ particular streams by subscribing for the service.
+ 4. Flow control: peers can throttle data flow of individual streams by
+ delaying 'read' and 'write' commands.
"""
from tcf import services
NAME = "Streams"
+
class StreamsService(services.Service):
def getName(self):
return NAME
def subscribe(self, stream_type, listener, done):
"""
- Clients must subscribe for one or more stream types to be able to send or receive stream data.
- Subscribers receive notifications when a stream of given type is created or disposed.
- Subscribers are required to respond with 'read' or 'disconnect' commands as necessary.
+ Clients must subscribe for one or more stream types to be able to send
+ or receive stream data.
+ Subscribers receive notifications when a stream of given type is
+ created or disposed.
+ Subscribers are required to respond with 'read' or 'disconnect'
+ commands as necessary.
@param stream_type - the stream source type.
@param listener - client implementation of StreamsListener interface.
@param done - command result call back object.
@@ -52,13 +60,18 @@ class StreamsService(services.Service):
def read(self, stream_id, size, done):
"""
- Read data from a stream. If stream buffer is empty, the command will wait until data is available.
- Remote peer will continue to process other commands while 'read' command is pending.
- Client can send more 'read' commands without waiting for the first command to complete.
+ Read data from a stream. If stream buffer is empty, the command will
+ wait until data is available.
+ Remote peer will continue to process other commands while 'read'
+ command is pending.
+ Client can send more 'read' commands without waiting for the first
+ command to complete.
Doing that improves communication channel bandwidth utilization.
Pending 'read' commands will be executed in same order as issued.
- Client can delay sending of 'read' command if it is not ready to receive more data,
- however, delaying for too long can cause stream buffer overflow and lost of data.
+ Client can delay sending of 'read' command if it is not ready to
+ receive more data,
+ however, delaying for too long can cause stream buffer overflow and
+ lost of data.
@param stream_id - ID of the stream.
@param size - max number of bytes to read.
@param done - command result call back object.
@@ -68,9 +81,12 @@ class StreamsService(services.Service):
def write(self, stream_id, buf, offset, size, done):
"""
- Write data to a stream. If stream buffer is full, the command will wait until space is available.
- Remote peer will continue to process other commands while 'write' command is pending.
- Client can send more 'write' commands without waiting for the first command to complete.
+ Write data to a stream. If stream buffer is full, the command will wait
+ until space is available.
+ Remote peer will continue to process other commands while 'write'
+ command is pending.
+ Client can send more 'write' commands without waiting for the first
+ command to complete.
Doing that improves communication channel bandwidth utilization.
Pending 'write' commands will be executed in same order as issued.
@param stream_id - ID of the stream.
@@ -84,7 +100,8 @@ class StreamsService(services.Service):
def eos(self, stream_id, done):
"""
- Send End Of Stream marker to a stream. No more writing to the stream is allowed after that.
+ Send End Of Stream marker to a stream. No more writing to the stream is
+ allowed after that.
@param stream_id - ID of the stream.
@param done - command result call back object.
@return - pending command handle.
@@ -94,9 +111,12 @@ class StreamsService(services.Service):
def connect(self, stream_id, done):
"""
Connect client to a stream.
- Some data might be dropped from the stream by the time "connect" command is executed.
- Client should be able to re-sync with stream data if it wants to read from such stream.
- If a client wants to read a stream from the beginning it should use "subscribe" command
+ Some data might be dropped from the stream by the time "connect"
+ command is executed.
+ Client should be able to re-sync with stream data if it wants to read
+ from such stream.
+ If a client wants to read a stream from the beginning it should use
+ "subscribe" command
instead of "connect".
@param stream_id - ID of the stream.
@param done - command result call back object.
@@ -117,11 +137,15 @@ class StreamsService(services.Service):
class StreamsListener(object):
"""
Clients can implement StreamsListener interface to be notified
- when a stream is created or disposed. The interface is registered with 'subscribe' command.
+ when a stream is created or disposed. The interface is registered with
+ 'subscribe' command.
- When new stream is created, client must decide if it is interested in that particular stream instance.
- If not interested, client should send 'disconnect' command to allow remote peer to free resources and bandwidth.
- If not disconnected, client is required to send 'read' commands as necessary to prevent stream buffer overflow.
+ When new stream is created, client must decide if it is interested in that
+ particular stream instance.
+ If not interested, client should send 'disconnect' command to allow remote
+ peer to free resources and bandwidth.
+ If not disconnected, client is required to send 'read' commands as
+ necessary to prevent stream buffer overflow.
"""
def created(self, stream_type, stream_id, context_id):
@@ -129,9 +153,11 @@ class StreamsListener(object):
Called when a new stream is created.
@param stream_type - source type of the stream.
@param stream_id - ID of the stream.
- @param context_id - a context ID that is associated with the stream, or None.
+ @param context_id - a context ID that is associated with the stream,
+ or None.
Exact meaning of the context ID depends on stream type.
- Stream types and context IDs are defined by services that use Streams service to transmit data.
+ Stream types and context IDs are defined by services that use Streams
+ service to transmit data.
"""
pass
@@ -143,6 +169,7 @@ class StreamsListener(object):
"""
pass
+
class DoneSubscribe(object):
"""
Call back interface for 'subscribe' command.
@@ -150,6 +177,7 @@ class DoneSubscribe(object):
def doneSubscribe(self, token, error):
pass
+
class DoneUnsubscribe(object):
"""
Call back interface for 'unsubscribe' command.
@@ -157,6 +185,7 @@ class DoneUnsubscribe(object):
def doneUnsubscribe(self, token, error):
pass
+
class DoneRead(object):
"""
Call back interface for 'read' command.
@@ -166,15 +195,17 @@ class DoneRead(object):
Called when 'read' command is done.
@param token - command handle.
@param error - error object or None.
- @param lost_size - number of bytes that were lost because of buffer overflow.
- 'lost_size' -1 means unknown number of bytes were lost.
- if both 'lost_size' and 'data.length' are non-zero then lost bytes are considered
- located right before read bytes.
+ @param lost_size - number of bytes that were lost because of buffer
+ overflow. 'lost_size' -1 means unknown number of
+ bytes were lost. If both 'lost_size' and
+ 'data.length' are non-zero then lost bytes are
+ considered located right before read bytes.
@param data - bytes read from the stream.
@param eos - true if end of stream was reached.
"""
pass
+
class DoneWrite(object):
"""
Call back interface for 'write' command.
@@ -187,6 +218,7 @@ class DoneWrite(object):
"""
pass
+
class DoneEOS(object):
"""
Call back interface for 'eos' command.
@@ -199,6 +231,7 @@ class DoneEOS(object):
"""
pass
+
class DoneConnect(object):
"""
Call back interface for 'connect' command.
@@ -211,6 +244,7 @@ class DoneConnect(object):
"""
pass
+
class DoneDisconnect(object):
"""
Call back interface for 'disconnect' command.

Back to the top