diff options
-rw-r--r-- | agent/tcf/services/streamsservice.c | 195 | ||||
-rw-r--r-- | agent/tcf/services/streamsservice.h | 8 |
2 files changed, 149 insertions, 54 deletions
diff --git a/agent/tcf/services/streamsservice.c b/agent/tcf/services/streamsservice.c index ea34a715..f295ccdf 100644 --- a/agent/tcf/services/streamsservice.c +++ b/agent/tcf/services/streamsservice.c @@ -572,14 +572,9 @@ void virtual_stream_delete(VirtualStream * stream) { post_event(delete_stream, stream); } -static void command_subscribe(char * token, Channel * c) { - char type[256]; - int err = 0; +int virtual_stream_subscribe (Channel * c, const char * type) { LINK * l; - - json_read_string(&c->inp, type, sizeof(type)); - json_test_char(&c->inp, MARKER_EOA); - json_test_char(&c->inp, MARKER_EOM); + int err = 0; for (l = subscriptions.next; l != &subscriptions;) { Subscription * h = all2subscription(l); @@ -596,23 +591,32 @@ static void command_subscribe(char * token, Channel * c) { strlcpy(s->type, type, sizeof(s->type)); s->channel = c; } + else errno = err; - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); + return err == 0 ? 0 : -1; } -static void command_unsubscribe(char * token, Channel * c) { +static void command_subscribe(char * token, Channel * c) { char type[256]; int err = 0; - Subscription * s = NULL; - LINK * l; json_read_string(&c->inp, type, sizeof(type)); json_test_char(&c->inp, MARKER_EOA); json_test_char(&c->inp, MARKER_EOM); + if (virtual_stream_subscribe(c, type) < 0) err = errno; + + write_stringz(&c->out, "R"); + write_stringz(&c->out, token); + write_errno(&c->out, err); + write_stream(&c->out, MARKER_EOM); +} + +int virtual_stream_unsubscribe (Channel * c, const char * type) { + LINK * l; + int err = 0; + Subscription * s = NULL; + for (l = subscriptions.next; l != &subscriptions;) { Subscription * h = all2subscription(l); l = l->next; @@ -623,6 +627,20 @@ static void command_unsubscribe(char * token, Channel * c) { } if (s == NULL) err = ERR_INV_CONTEXT; if (err == 0) delete_subscription(s); + else errno = err; + + return err == 0 ? 0 : -1; +} + +static void command_unsubscribe(char * token, Channel * c) { + char type[256]; + int err = 0; + + json_read_string(&c->inp, type, sizeof(type)); + json_test_char(&c->inp, MARKER_EOA); + json_test_char(&c->inp, MARKER_EOM); + + if (virtual_stream_unsubscribe(c, type) < 0) err = errno; write_stringz(&c->out, "R"); write_stringz(&c->out, token); @@ -630,19 +648,10 @@ static void command_unsubscribe(char * token, Channel * c) { write_stream(&c->out, MARKER_EOM); } -static void command_read(char * token, Channel * c) { - char id[256]; - size_t size; - StreamClient * client; +int virtual_stream_read (Channel * c, char * token, char * id, size_t size) { int err = 0; + StreamClient * client= find_client(id, c); - json_read_string(&c->inp, id, sizeof(id)); - json_test_char(&c->inp, MARKER_EOA); - size = json_read_ulong(&c->inp); - json_test_char(&c->inp, MARKER_EOA); - json_test_char(&c->inp, MARKER_EOM); - - client = find_client(id, c); if (client == NULL) err = errno; if (!err && (client->stream->access & VS_ENABLE_REMOTE_READ) == 0) err = ERR_UNSUPPORTED; @@ -663,7 +672,30 @@ static void command_read(char * token, Channel * c) { advance_stream_buffer(stream); } } - else { + else errno = err; + + return err == 0 ? 0 : -1; +} + +static void command_read(char * token, Channel * c) { + char id[256]; + size_t size; + int err = 0; + + json_read_string(&c->inp, id, sizeof(id)); + json_test_char(&c->inp, MARKER_EOA); + size = json_read_ulong(&c->inp); + json_test_char(&c->inp, MARKER_EOA); + json_test_char(&c->inp, MARKER_EOM); + + if (virtual_stream_read (c, token, id, size) < 0) err = errno; + + if (err != 0) { + /* + * Handle reply with an error. If none error was detected, the reply + * was sent back by virtual_stream_read() or delayed. + */ + write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_stringz(&c->out, "null"); @@ -676,20 +708,12 @@ static void command_read(char * token, Channel * c) { } } -static void command_write(char * token, Channel * c) { - char id[256]; - StreamClient * client; - long size; - long offs = 0; +int virtual_stream_write (Channel * c, char * token, char * id, size_t size, InputStream * inp) { char * data = NULL; int err = 0; + long offs = 0; + StreamClient * client = find_client(id, c); - json_read_string(&c->inp, id, sizeof(id)); - json_test_char(&c->inp, MARKER_EOA); - size = json_read_long(&c->inp); - json_test_char(&c->inp, MARKER_EOA); - - client = find_client(id, c); if (client == NULL) err = errno; if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; @@ -699,7 +723,7 @@ static void command_write(char * token, Channel * c) { if (!err && !list_is_empty(&client->write_requests)) data = (char *)loc_alloc(size); - json_read_binary_start(&state, &c->inp); + json_read_binary_start(&state, inp); for (;;) { if (data != NULL) { size_t rd = json_read_binary_data(&state, data + data_pos, size - offs - data_pos); @@ -726,9 +750,6 @@ static void command_write(char * token, Channel * c) { json_read_binary_end(&state); } - json_test_char(&c->inp, MARKER_EOA); - json_test_char(&c->inp, MARKER_EOM); - if (data != NULL) { WriteRequest * r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); list_init(&r->link_client); @@ -738,26 +759,51 @@ static void command_write(char * token, Channel * c) { strlcpy(r->token, token, sizeof(r->token)); list_add_last(&r->link_client, &client->write_requests); } - else { + else if (err == 0) { write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } + + if (err != 0) errno = err; + + return err == 0 ? 0 : -1; } -static void command_eos(char * token, Channel * c) { +static void command_write(char * token, Channel * c) { char id[256]; - StreamClient * client; - size_t done = 0; - WriteRequest * r = NULL; + long size; int err = 0; - json_read_string(&c->inp, id, sizeof(id)); json_test_char(&c->inp, MARKER_EOA); + size = json_read_long(&c->inp); + json_test_char(&c->inp, MARKER_EOA); + + if (virtual_stream_write(c, token, id, size, &c->inp) < 0) err = errno; + + json_test_char(&c->inp, MARKER_EOA); json_test_char(&c->inp, MARKER_EOM); - client = find_client(id, c); + if (err != 0) { + /* + * Handle reply with an error. If none error was detected, the reply + * was sent back by virtual_stream_write() or delayed. + */ + + write_stringz(&c->out, "R"); + write_stringz(&c->out, token); + write_errno(&c->out, err); + write_stream(&c->out, MARKER_EOM); + } +} + +int virtual_stream_eos (Channel * c, char * token, char * id) { + size_t done = 0; + WriteRequest * r = NULL; + int err = 0; + StreamClient * client = find_client(id, c); + if (client == NULL) err = errno; if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; if (!err && !list_is_empty(&client->write_requests)) r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); @@ -770,15 +816,19 @@ static void command_eos(char * token, Channel * c) { strlcpy(r->token, token, sizeof(r->token)); list_add_last(&r->link_client, &client->write_requests); } - else { + else if (err == 0) { write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } + + if (err != 0) errno = err; + + return err == 0 ? 0 : -1; } -static void command_connect(char * token, Channel * c) { +static void command_eos(char * token, Channel * c) { char id[256]; int err = 0; @@ -786,30 +836,67 @@ static void command_connect(char * token, Channel * c) { json_test_char(&c->inp, MARKER_EOA); json_test_char(&c->inp, MARKER_EOM); + if (virtual_stream_eos(c, token, id) < 0) err = errno; + + if (err != 0) { + /* + * Handle reply with an error. If none error was detected, the reply + * was sent back by virtual_stream_eos() or delayed. + */ + write_stringz(&c->out, "R"); + write_stringz(&c->out, token); + write_errno(&c->out, err); + write_stream(&c->out, MARKER_EOM); + } +} + +int virtual_stream_connect (Channel * c, char * token, char * id) { + int err = 0; + if (find_client(id, c) == NULL) { VirtualStream * stream = virtual_stream_find(id); if (stream == NULL) err = errno; else create_client(stream, c); } + return err == 0 ? 0 : -1; +} + +static void command_connect(char * token, Channel * c) { + char id[256]; + int err = 0; + + json_read_string(&c->inp, id, sizeof(id)); + json_test_char(&c->inp, MARKER_EOA); + json_test_char(&c->inp, MARKER_EOM); + + if (virtual_stream_connect (c, token, id) < 0) err = errno; + write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } +int virtual_stream_disconnect (Channel * c, char * token, char * id) { + int err = 0; + StreamClient * client = find_client(id, c); + + if (client == NULL) err = errno; + if (!err) delete_client(client); + + return err == 0 ? 0 : -1; +} + static void command_disconnect(char * token, Channel * c) { char id[256]; - StreamClient * client; int err = 0; json_read_string(&c->inp, id, sizeof(id)); json_test_char(&c->inp, MARKER_EOA); json_test_char(&c->inp, MARKER_EOM); - client = find_client(id, c); - if (client == NULL) err = errno; - if (!err) delete_client(client); + if (virtual_stream_disconnect (c, token, id) < 0) err = errno; write_stringz(&c->out, "R"); write_stringz(&c->out, token); diff --git a/agent/tcf/services/streamsservice.h b/agent/tcf/services/streamsservice.h index a24f906d..f598a079 100644 --- a/agent/tcf/services/streamsservice.h +++ b/agent/tcf/services/streamsservice.h @@ -43,6 +43,14 @@ extern int virtual_stream_add_data(VirtualStream * stream, char * buf, size_t bu extern int virtual_stream_get_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int * eos); extern int virtual_stream_is_empty(VirtualStream * stream); +extern int virtual_stream_eos (Channel * c, char * token, char * id); +extern int virtual_stream_write (Channel * c, char * token, char * id, size_t size, InputStream * inp); +extern int virtual_stream_read (Channel * c, char * token, char * id, size_t size); +extern int virtual_stream_unsubscribe (Channel * c, const char * type); +extern int virtual_stream_subscribe (Channel * c, const char * type); +extern int virtual_stream_connect (Channel * c, char * token, char * id); +extern int virtual_stream_disconnect (Channel * c, char * token, char * id); + /* * Initialize streams service. */ |