diff options
Diffstat (limited to 'python/src/tcf/services/streams.py')
-rw-r--r-- | python/src/tcf/services/streams.py | 100 |
1 files changed, 67 insertions, 33 deletions
diff --git a/python/src/tcf/services/streams.py b/python/src/tcf/services/streams.py index 446941b23..6e92054e2 100644 --- a/python/src/tcf/services/streams.py +++ b/python/src/tcf/services/streams.py @@ -1,5 +1,5 @@ -# ******************************************************************************* -# * Copyright (c) 2011 Wind River Systems, Inc. and others. +# ***************************************************************************** +# * Copyright (c) 2011, 2012 Wind River Systems, Inc. and others. # * All rights reserved. This program and the accompanying materials # * are made available under the terms of the Eclipse Public License v1.0 # * which accompanies this distribution, and is available at @@ -7,32 +7,40 @@ # * # * Contributors: # * Wind River Systems - initial API and implementation -# ******************************************************************************* +# ***************************************************************************** """ -Streams service is a generic interface to support streaming of data between host and remote agents. +Streams service is a generic interface to support streaming of data between +host and remote agents. The service supports: - 1. Asynchronous overlapped data streaming: multiple 'read' or 'write' command can be issued at same time, both peers - can continue data processing concurrently with data transmission. + 1. Asynchronous overlapped data streaming: multiple 'read' or 'write' command + can be issued at same time, both peers can continue data processing + concurrently with data transmission. 2. Multicast: multiple clients can receive data from same stream. - 3. Subscription model: clients are required to expressed interest in particular streams by subscribing for the service. - 4. Flow control: peers can throttle data flow of individual streams by delaying 'read' and 'write' commands. + 3. Subscription model: clients are required to expressed interest in + particular streams by subscribing for the service. + 4. Flow control: peers can throttle data flow of individual streams by + delaying 'read' and 'write' commands. """ from tcf import services NAME = "Streams" + class StreamsService(services.Service): def getName(self): return NAME def subscribe(self, stream_type, listener, done): """ - Clients must subscribe for one or more stream types to be able to send or receive stream data. - Subscribers receive notifications when a stream of given type is created or disposed. - Subscribers are required to respond with 'read' or 'disconnect' commands as necessary. + Clients must subscribe for one or more stream types to be able to send + or receive stream data. + Subscribers receive notifications when a stream of given type is + created or disposed. + Subscribers are required to respond with 'read' or 'disconnect' + commands as necessary. @param stream_type - the stream source type. @param listener - client implementation of StreamsListener interface. @param done - command result call back object. @@ -52,13 +60,18 @@ class StreamsService(services.Service): def read(self, stream_id, size, done): """ - Read data from a stream. If stream buffer is empty, the command will wait until data is available. - Remote peer will continue to process other commands while 'read' command is pending. - Client can send more 'read' commands without waiting for the first command to complete. + Read data from a stream. If stream buffer is empty, the command will + wait until data is available. + Remote peer will continue to process other commands while 'read' + command is pending. + Client can send more 'read' commands without waiting for the first + command to complete. Doing that improves communication channel bandwidth utilization. Pending 'read' commands will be executed in same order as issued. - Client can delay sending of 'read' command if it is not ready to receive more data, - however, delaying for too long can cause stream buffer overflow and lost of data. + Client can delay sending of 'read' command if it is not ready to + receive more data, + however, delaying for too long can cause stream buffer overflow and + lost of data. @param stream_id - ID of the stream. @param size - max number of bytes to read. @param done - command result call back object. @@ -68,9 +81,12 @@ class StreamsService(services.Service): def write(self, stream_id, buf, offset, size, done): """ - Write data to a stream. If stream buffer is full, the command will wait until space is available. - Remote peer will continue to process other commands while 'write' command is pending. - Client can send more 'write' commands without waiting for the first command to complete. + Write data to a stream. If stream buffer is full, the command will wait + until space is available. + Remote peer will continue to process other commands while 'write' + command is pending. + Client can send more 'write' commands without waiting for the first + command to complete. Doing that improves communication channel bandwidth utilization. Pending 'write' commands will be executed in same order as issued. @param stream_id - ID of the stream. @@ -84,7 +100,8 @@ class StreamsService(services.Service): def eos(self, stream_id, done): """ - Send End Of Stream marker to a stream. No more writing to the stream is allowed after that. + Send End Of Stream marker to a stream. No more writing to the stream is + allowed after that. @param stream_id - ID of the stream. @param done - command result call back object. @return - pending command handle. @@ -94,9 +111,12 @@ class StreamsService(services.Service): def connect(self, stream_id, done): """ Connect client to a stream. - Some data might be dropped from the stream by the time "connect" command is executed. - Client should be able to re-sync with stream data if it wants to read from such stream. - If a client wants to read a stream from the beginning it should use "subscribe" command + Some data might be dropped from the stream by the time "connect" + command is executed. + Client should be able to re-sync with stream data if it wants to read + from such stream. + If a client wants to read a stream from the beginning it should use + "subscribe" command instead of "connect". @param stream_id - ID of the stream. @param done - command result call back object. @@ -117,11 +137,15 @@ class StreamsService(services.Service): class StreamsListener(object): """ Clients can implement StreamsListener interface to be notified - when a stream is created or disposed. The interface is registered with 'subscribe' command. + when a stream is created or disposed. The interface is registered with + 'subscribe' command. - When new stream is created, client must decide if it is interested in that particular stream instance. - If not interested, client should send 'disconnect' command to allow remote peer to free resources and bandwidth. - If not disconnected, client is required to send 'read' commands as necessary to prevent stream buffer overflow. + When new stream is created, client must decide if it is interested in that + particular stream instance. + If not interested, client should send 'disconnect' command to allow remote + peer to free resources and bandwidth. + If not disconnected, client is required to send 'read' commands as + necessary to prevent stream buffer overflow. """ def created(self, stream_type, stream_id, context_id): @@ -129,9 +153,11 @@ class StreamsListener(object): Called when a new stream is created. @param stream_type - source type of the stream. @param stream_id - ID of the stream. - @param context_id - a context ID that is associated with the stream, or None. + @param context_id - a context ID that is associated with the stream, + or None. Exact meaning of the context ID depends on stream type. - Stream types and context IDs are defined by services that use Streams service to transmit data. + Stream types and context IDs are defined by services that use Streams + service to transmit data. """ pass @@ -143,6 +169,7 @@ class StreamsListener(object): """ pass + class DoneSubscribe(object): """ Call back interface for 'subscribe' command. @@ -150,6 +177,7 @@ class DoneSubscribe(object): def doneSubscribe(self, token, error): pass + class DoneUnsubscribe(object): """ Call back interface for 'unsubscribe' command. @@ -157,6 +185,7 @@ class DoneUnsubscribe(object): def doneUnsubscribe(self, token, error): pass + class DoneRead(object): """ Call back interface for 'read' command. @@ -166,15 +195,17 @@ class DoneRead(object): Called when 'read' command is done. @param token - command handle. @param error - error object or None. - @param lost_size - number of bytes that were lost because of buffer overflow. - 'lost_size' -1 means unknown number of bytes were lost. - if both 'lost_size' and 'data.length' are non-zero then lost bytes are considered - located right before read bytes. + @param lost_size - number of bytes that were lost because of buffer + overflow. 'lost_size' -1 means unknown number of + bytes were lost. If both 'lost_size' and + 'data.length' are non-zero then lost bytes are + considered located right before read bytes. @param data - bytes read from the stream. @param eos - true if end of stream was reached. """ pass + class DoneWrite(object): """ Call back interface for 'write' command. @@ -187,6 +218,7 @@ class DoneWrite(object): """ pass + class DoneEOS(object): """ Call back interface for 'eos' command. @@ -199,6 +231,7 @@ class DoneEOS(object): """ pass + class DoneConnect(object): """ Call back interface for 'connect' command. @@ -211,6 +244,7 @@ class DoneConnect(object): """ pass + class DoneDisconnect(object): """ Call back interface for 'disconnect' command. |