Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'agent/tcf/services/streamsservice.c')
-rw-r--r--agent/tcf/services/streamsservice.c856
1 files changed, 856 insertions, 0 deletions
diff --git a/agent/tcf/services/streamsservice.c b/agent/tcf/services/streamsservice.c
new file mode 100644
index 00000000..0414e5d3
--- /dev/null
+++ b/agent/tcf/services/streamsservice.c
@@ -0,0 +1,856 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2010 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ * You may elect to redistribute this code under either of these licenses.
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * TCF Streams - generic streams service.
+ */
+
+#include <config.h>
+
+#if SERVICE_Streams
+
+#include <assert.h>
+#include <string.h>
+#include <framework/channel.h>
+#include <framework/exceptions.h>
+#include <framework/json.h>
+#include <framework/errors.h>
+#include <framework/trace.h>
+#include <framework/events.h>
+#include <framework/myalloc.h>
+#include <services/streamsservice.h>
+
+static const char * STREAMS = "Streams";
+
+typedef struct StreamClient StreamClient;
+typedef struct ReadRequest ReadRequest;
+typedef struct WriteRequest WriteRequest;
+typedef struct Subscription Subscription;
+
+#define STREAM_MAGIC 0x29465398
+
+struct VirtualStream {
+ LINK link_all;
+ unsigned magic;
+ char type[256];
+ unsigned id;
+ unsigned access;
+ VirtualStreamCallBack * callback;
+ void * callback_args;
+ int ref_cnt;
+ int deleted;
+ LINK clients;
+ uint64_t pos;
+ char * buf;
+ size_t buf_len;
+ size_t buf_inp;
+ size_t buf_out;
+ unsigned eos_inp;
+ unsigned eos_out;
+ unsigned data_available_posted;
+ unsigned space_available_posted;
+};
+
+struct StreamClient {
+ LINK link_hash;
+ LINK link_stream;
+ LINK link_all;
+ LINK read_requests;
+ LINK write_requests;
+ VirtualStream * stream;
+ Channel * channel;
+ uint64_t pos;
+};
+
+struct ReadRequest {
+ LINK link_client;
+ StreamClient * client;
+ char token[256];
+ size_t size;
+};
+
+struct WriteRequest {
+ LINK link_client;
+ StreamClient * client;
+ char token[256];
+ char * data;
+ size_t offs;
+ size_t size;
+ int eos;
+};
+
+struct Subscription {
+ LINK link_all;
+ char type[256];
+ Channel * channel;
+};
+
+#define hash2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_hash)))
+#define stream2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_stream)))
+#define all2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_all)))
+#define all2subscription(A) ((Subscription *)((char *)(A) - offsetof(Subscription, link_all)))
+#define all2stream(A) ((VirtualStream *)((char *)(A) - offsetof(VirtualStream, link_all)))
+#define client2read_request(A) ((ReadRequest *)((char *)(A) - offsetof(ReadRequest, link_client)))
+#define client2write_request(A) ((WriteRequest *)((char *)(A) - offsetof(WriteRequest, link_client)))
+
+#define HANDLE_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1)
+static LINK handle_hash[HANDLE_HASH_SIZE];
+static LINK clients;
+static LINK streams;
+static LINK subscriptions;
+static unsigned id_cnt = 0;
+
+static unsigned get_client_hash(unsigned id, Channel * c) {
+ return (id + (unsigned)(uintptr_t)c) % HANDLE_HASH_SIZE;
+}
+
+static int str2id(char * s, unsigned * id) {
+ char * p = NULL;
+ if (*s++ != 'V') return 0;
+ if (*s++ != 'S') return 0;
+ *id = (unsigned)strtoul(s, &p, 10);
+ return *p == 0;
+}
+
+void virtual_stream_get_id(VirtualStream * stream, char * id_buf, size_t buf_size) {
+ snprintf(id_buf, buf_size, "VS%u", stream->id);
+}
+
+static StreamClient * find_client(char * s, Channel * c) {
+ unsigned id = 0;
+ if (str2id(s, &id)) {
+ unsigned h = get_client_hash(id, c);
+ LINK * l = handle_hash[h].next;
+ while (l != &handle_hash[h]) {
+ StreamClient * client = hash2client(l);
+ if (client->stream->id == id && client->channel == c) return client;
+ l = l->next;
+ }
+ }
+ errno = set_fmt_errno(ERR_OTHER, "No such stream: %s", s);
+ return NULL;
+}
+
+static void send_event_stream_created(OutputStream * out, VirtualStream * stream, const char * context_id) {
+ char id[256];
+
+ virtual_stream_get_id(stream, id, sizeof(id));
+ write_stringz(out, "E");
+ write_stringz(out, STREAMS);
+ write_stringz(out, "created");
+
+ json_write_string(out, stream->type);
+ write_stream(out, 0);
+ json_write_string(out, id);
+ write_stream(out, 0);
+ json_write_string(out, context_id);
+ write_stream(out, 0);
+ write_stream(out, MARKER_EOM);
+}
+
+static void send_event_stream_disposed(OutputStream * out, VirtualStream * stream) {
+ char id[256];
+
+ virtual_stream_get_id(stream, id, sizeof(id));
+ write_stringz(out, "E");
+ write_stringz(out, STREAMS);
+ write_stringz(out, "disposed");
+
+ json_write_string(out, stream->type);
+ write_stream(out, 0);
+ json_write_string(out, id);
+ write_stream(out, 0);
+ write_stream(out, MARKER_EOM);
+}
+
+static void delete_read_request(ReadRequest * r) {
+ Channel * c = r->client->channel;
+ Trap trap;
+
+ if (set_trap(&trap)) {
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, r->token);
+ write_stringz(&c->out, "null");
+ write_errno(&c->out, ERR_COMMAND_CANCELLED);
+ json_write_long(&c->out, 0);
+ write_stream(&c->out, 0);
+ json_write_boolean(&c->out, 1);
+ write_stream(&c->out, 0);
+ write_stream(&c->out, MARKER_EOM);
+ clear_trap(&trap);
+ }
+ else {
+ trace(LOG_ALWAYS, "Exception handling pending stream read command: %d %s",
+ trap.error, errno_to_str(trap.error));
+ }
+ list_remove(&r->link_client);
+ loc_free(r);
+}
+
+static void delete_write_request(WriteRequest * r, int error) {
+ Channel * c = r->client->channel;
+ Trap trap;
+
+ if (set_trap(&trap)) {
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, r->token);
+ write_errno(&c->out, error);
+ write_stream(&c->out, MARKER_EOM);
+ clear_trap(&trap);
+ }
+ else {
+ trace(LOG_ALWAYS, "Exception handling pending stream write command: %d %s",
+ trap.error, errno_to_str(trap.error));
+ }
+ list_remove(&r->link_client);
+ loc_free(r->data);
+ loc_free(r);
+}
+
+static void delete_stream(void * args) {
+ VirtualStream * stream = (VirtualStream *)args;
+
+ assert(stream->magic == STREAM_MAGIC);
+ assert(list_is_empty(&stream->clients));
+ assert(stream->deleted);
+ stream->magic = 0;
+ list_remove(&stream->link_all);
+ loc_free(stream->buf);
+ loc_free(stream);
+}
+
+static void notify_data_available(void * args) {
+ VirtualStream * stream = (VirtualStream *)args;
+ assert(stream->magic == STREAM_MAGIC);
+ stream->data_available_posted = 0;
+ if (stream->deleted || stream->eos_out) return;
+ stream->callback(stream, VS_EVENT_DATA_AVAILABLE, stream->callback_args);
+}
+
+static void notify_space_available(void * args) {
+ VirtualStream * stream = (VirtualStream *)args;
+ assert(stream->magic == STREAM_MAGIC);
+ stream->space_available_posted = 0;
+ if (stream->deleted || stream->eos_inp) return;
+ stream->callback(stream, VS_EVENT_SPACE_AVAILABLE, stream->callback_args);
+}
+
+static void advance_stream_buffer(VirtualStream * stream) {
+ size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len;
+ uint64_t min_pos = ~(uint64_t)0;
+ uint64_t buf_pos = stream->pos - len;
+ LINK * l;
+
+ assert(stream->access & VS_ENABLE_REMOTE_READ);
+ for (l = stream->clients.next; l != &stream->clients; l = l->next) {
+ StreamClient * client = stream2client(l);
+ assert(client->pos <= stream->pos);
+ if (client->pos < min_pos) min_pos = client->pos;
+ }
+ if (min_pos == ~(uint64_t)0) {
+ stream->buf_out = stream->buf_inp;
+ }
+ else if (min_pos > buf_pos) {
+ assert(min_pos - buf_pos <= len);
+ stream->buf_out = (stream->buf_out + (unsigned)(min_pos - buf_pos)) % stream->buf_len;
+ }
+ else if (stream->pos - min_pos >= stream->buf_len) {
+ /* TODO: drop stream data */
+ assert(0);
+ }
+ if (len != (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len &&
+ !stream->space_available_posted) {
+ post_event(notify_space_available, stream);
+ stream->space_available_posted = 1;
+ }
+}
+
+static StreamClient * create_client(VirtualStream * stream, Channel * channel) {
+ size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len;
+ StreamClient * client = (StreamClient *)loc_alloc_zero(sizeof(StreamClient));
+ list_init(&client->link_hash);
+ list_init(&client->link_stream);
+ list_init(&client->link_all);
+ list_init(&client->read_requests);
+ list_init(&client->write_requests);
+ client->stream = stream;
+ client->channel = channel;
+ client->pos = stream->pos - len;
+ list_add_first(&client->link_hash, &handle_hash[get_client_hash(stream->id, channel)]);
+ list_add_first(&client->link_stream, &stream->clients);
+ list_add_first(&client->link_all, &clients);
+ stream->ref_cnt++;
+ return client;
+}
+
+static void delete_client(StreamClient * client) {
+ VirtualStream * stream = client->stream;
+ Trap trap;
+ LINK * n;
+
+ assert(stream->ref_cnt > 0);
+ if (set_trap(&trap)) {
+ send_event_stream_disposed(&client->channel->out, stream);
+ clear_trap(&trap);
+ }
+ else {
+ trace(LOG_ALWAYS, "Exception sending stream deleted event: %d %s",
+ trap.error, errno_to_str(trap.error));
+ }
+ list_remove(&client->link_hash);
+ list_remove(&client->link_stream);
+ list_remove(&client->link_all);
+ for (n = client->read_requests.next; n != &client->read_requests;) {
+ ReadRequest * r = client2read_request(n);
+ n = n->next;
+ delete_read_request(r);
+ }
+ for (n = client->write_requests.next; n != &client->write_requests;) {
+ WriteRequest * r = client2write_request(n);
+ n = n->next;
+ delete_write_request(r, ERR_COMMAND_CANCELLED);
+ }
+ loc_free(client);
+ if (--stream->ref_cnt == 0) {
+ assert(list_is_empty(&stream->clients));
+ assert(stream->deleted);
+ post_event(delete_stream, stream);
+ }
+ else if (stream->access & VS_ENABLE_REMOTE_READ) {
+ advance_stream_buffer(stream);
+ }
+}
+
+static void delete_subscription(Subscription * s) {
+ list_remove(&s->link_all);
+ loc_free(s);
+}
+
+static void send_read_reply(StreamClient * client, char * token, size_t size) {
+ VirtualStream * stream = client->stream;
+ Channel * c = client->channel;
+ size_t lost = 0;
+ size_t read1 = 0;
+ size_t read2 = 0;
+ int eos = 0;
+ char * data1 = NULL;
+ char * data2 = NULL;
+ size_t pos;
+ size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len;
+
+ assert(len > 0 || stream->eos_inp);
+ assert(client->pos <= stream->pos);
+ if ((uint64_t)len < stream->pos - client->pos) {
+ lost = (long)(stream->pos - client->pos - len);
+ }
+ else {
+ len = (unsigned)(stream->pos - client->pos);
+ }
+ pos = (stream->buf_inp + stream->buf_len - len) % stream->buf_len;
+ if (len > size) len = size;
+ data1 = stream->buf + pos;
+ if (pos + len <= stream->buf_len) {
+ read1 = len;
+ }
+ else {
+ read1 = stream->buf_len - pos;
+ data2 = stream->buf;
+ read2 = len - read1;
+ }
+ assert(read1 + read2 == len);
+ client->pos += lost + read1 + read2;
+ assert(client->pos <= stream->pos);
+ if (client->pos == stream->pos && stream->eos_inp) eos = 1;
+ assert(eos || lost + read1 + read2 > 0);
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ if (read1 + read2 > 0) {
+ JsonWriteBinaryState state;
+
+ json_write_binary_start(&state, &c->out, read1 + read2);
+ json_write_binary_data(&state, data1, read1);
+ json_write_binary_data(&state, data2, read2);
+ json_write_binary_end(&state);
+ write_stream(&c->out, 0);
+ }
+ else {
+ write_stringz(&c->out, "null");
+ }
+ write_errno(&c->out, 0);
+ json_write_long(&c->out, lost);
+ write_stream(&c->out, 0);
+ json_write_boolean(&c->out, eos);
+ write_stream(&c->out, 0);
+ write_stream(&c->out, MARKER_EOM);
+}
+
+void virtual_stream_create(const char * type, const char * context_id, size_t buf_len, unsigned access,
+ VirtualStreamCallBack * callback, void * callback_args, VirtualStream ** res) {
+ LINK * l;
+ VirtualStream * stream = (VirtualStream *)loc_alloc_zero(sizeof(VirtualStream));
+
+ buf_len++;
+ list_init(&stream->clients);
+ strlcpy(stream->type, type, sizeof(stream->type));
+ stream->magic = STREAM_MAGIC;
+ stream->id = id_cnt++;
+ stream->access = access;
+ stream->callback = callback;
+ stream->callback_args = callback_args;
+ stream->ref_cnt = 1;
+ stream->buf = (char *)loc_alloc(buf_len);
+ stream->buf_len = buf_len;
+ for (l = subscriptions.next; l != &subscriptions; l = l->next) {
+ Subscription * h = all2subscription(l);
+ if (strcmp(type, h->type) == 0) {
+ Trap trap;
+ create_client(stream, h->channel);
+ if (set_trap(&trap)) {
+ send_event_stream_created(&h->channel->out, stream, context_id);
+ clear_trap(&trap);
+ }
+ else {
+ trace(LOG_ALWAYS, "Exception sending stream created event: %d %s",
+ trap.error, errno_to_str(trap.error));
+ }
+ }
+ }
+ list_add_first(&stream->link_all, &streams);
+ *res = stream;
+}
+
+VirtualStream * virtual_stream_find(char * id) {
+ LINK * l;
+ unsigned n = 0;
+
+ if (str2id(id, &n)) {
+ for (l = streams.next; l != &streams; l = l->next) {
+ VirtualStream * stream = all2stream(l);
+ if (stream->id == n && !stream->deleted) return stream;
+ }
+ }
+ errno = ERR_INV_CONTEXT;
+ return NULL;
+}
+
+int virtual_stream_add_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int eos) {
+ int err = 0;
+
+ assert(stream->magic == STREAM_MAGIC);
+ if (stream->eos_inp) err = ERR_EOF;
+
+ if (!err) {
+ size_t len = (stream->buf_out + stream->buf_len - stream->buf_inp - 1) % stream->buf_len;
+ assert(len < stream->buf_len);
+ if (buf_size < len) len = buf_size;
+ if (stream->buf_inp + len <= stream->buf_len) {
+ memcpy(stream->buf + stream->buf_inp, buf, len);
+ }
+ else {
+ size_t x = stream->buf_len - stream->buf_inp;
+ size_t y = len - x;
+ memcpy(stream->buf + stream->buf_inp, buf, x);
+ memcpy(stream->buf, buf + x, y);
+ }
+ stream->buf_inp = (stream->buf_inp + len) % stream->buf_len;
+ stream->pos += len;
+ *data_size = len;
+ if (eos && buf_size == len) stream->eos_inp = 1;
+ }
+
+ if (stream->access & VS_ENABLE_REMOTE_READ) {
+ if (!err && (stream->eos_inp || *data_size > 0)) {
+ LINK * l;
+ for (l = stream->clients.next; l != &stream->clients; l = l->next) {
+ StreamClient * client = stream2client(l);
+ while (!list_is_empty(&client->read_requests) && (client->pos < stream->pos || stream->eos_inp)) {
+ ReadRequest * r = client2read_request(client->read_requests.next);
+ list_remove(&r->link_client);
+ send_read_reply(client, r->token, r->size);
+ loc_free(r);
+ }
+ }
+ advance_stream_buffer(stream);
+ }
+ }
+ else if (!stream->data_available_posted) {
+ post_event(notify_data_available, stream);
+ stream->data_available_posted = 1;
+ }
+
+ errno = err;
+ return err ? -1 : 0;
+}
+
+int virtual_stream_get_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int * eos) {
+ size_t len;
+
+ assert(stream->magic == STREAM_MAGIC);
+ len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len;
+
+ if (len > buf_size) {
+ len = buf_size;
+ *eos = 0;
+ }
+ else {
+ *eos = stream->eos_inp;
+ }
+ *data_size = len;
+ if (*eos) stream->eos_out = 1;
+ if (stream->buf_out + len <= stream->buf_len) {
+ memcpy(buf, stream->buf + stream->buf_out, len);
+ }
+ else {
+ size_t x = stream->buf_len - stream->buf_out;
+ size_t y = len - x;
+ memcpy(buf, stream->buf + stream->buf_out, x);
+ memcpy(buf + x, stream->buf, y);
+ }
+ if (stream->access & VS_ENABLE_REMOTE_WRITE) {
+ LINK * l;
+ for (l = stream->clients.next; l != &stream->clients; l = l->next) {
+ StreamClient * client = stream2client(l);
+ if (!list_is_empty(&client->write_requests)) {
+ WriteRequest * r = client2write_request(client->write_requests.next);
+ size_t done = 0;
+ int error = 0;
+ if (virtual_stream_add_data(client->stream, r->data + r->offs,
+ r->size - r->offs, &done, r->eos) < 0) error = errno;
+ r->offs += done;
+ if (error || r->offs >= r->size) {
+ delete_write_request(r, error);
+ }
+ while (error && !list_is_empty(&client->write_requests)) {
+ r = client2write_request(client->write_requests.next);
+ delete_write_request(r, ERR_COMMAND_CANCELLED);
+ }
+ }
+ }
+ }
+ if ((stream->access & VS_ENABLE_REMOTE_READ) == 0 && len > 0) {
+ stream->buf_out = (stream->buf_out + len) % stream->buf_len;
+ assert(!*eos || stream->buf_out == stream->buf_inp);
+ if (!stream->space_available_posted) {
+ post_event(notify_space_available, stream);
+ stream->space_available_posted = 1;
+ }
+ }
+ return 0;
+}
+
+int virtual_stream_is_empty(VirtualStream * stream) {
+ assert(stream->magic == STREAM_MAGIC);
+ assert(!stream->deleted);
+ return stream->buf_out == stream->buf_inp;
+}
+
+void virtual_stream_delete(VirtualStream * stream) {
+ assert(stream->magic == STREAM_MAGIC);
+ assert(!stream->deleted);
+ stream->deleted = 1;
+ if (--stream->ref_cnt > 0) return;
+ assert(list_is_empty(&stream->clients));
+ post_event(delete_stream, stream);
+}
+
+static void command_subscribe(char * token, Channel * c) {
+ char type[256];
+ int err = 0;
+ LINK * l;
+
+ json_read_string(&c->inp, type, sizeof(type));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ for (l = subscriptions.next; l != &subscriptions;) {
+ Subscription * h = all2subscription(l);
+ l = l->next;
+ if (h->channel == c && strcmp(type, h->type) == 0) {
+ err = set_errno(ERR_OTHER, "Already subscribed");
+ break;
+ }
+ }
+ if (err == 0) {
+ Subscription * s = (Subscription *)loc_alloc_zero(sizeof(Subscription));
+ list_init(&s->link_all);
+ list_add_first(&s->link_all, &subscriptions);
+ strlcpy(s->type, type, sizeof(s->type));
+ s->channel = c;
+ }
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+}
+
+static void command_unsubscribe(char * token, Channel * c) {
+ char type[256];
+ int err = 0;
+ Subscription * s = NULL;
+ LINK * l;
+
+ json_read_string(&c->inp, type, sizeof(type));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ for (l = subscriptions.next; l != &subscriptions;) {
+ Subscription * h = all2subscription(l);
+ l = l->next;
+ if (h->channel == c && strcmp(type, h->type) == 0) {
+ s = h;
+ break;
+ }
+ }
+ if (s == NULL) err = ERR_INV_CONTEXT;
+ if (err == 0) delete_subscription(s);
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+}
+
+static void command_read(char * token, Channel * c) {
+ char id[256];
+ size_t size = 0;
+ StreamClient * client = NULL;
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ size = json_read_ulong(&c->inp);
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ client = find_client(id, c);
+ if (client == NULL) err = errno;
+ if (!err && (client->stream->access & VS_ENABLE_REMOTE_READ) == 0) err = ERR_UNSUPPORTED;
+
+ if (err == 0) {
+ VirtualStream * stream = client->stream;
+ if (client->pos == stream->pos && !stream->eos_inp) {
+ ReadRequest * r = (ReadRequest *)loc_alloc_zero(sizeof(ReadRequest));
+ list_init(&r->link_client);
+ r->client = client;
+ r->size = size;
+ strlcpy(r->token, token, sizeof(r->token));
+ list_add_last(&r->link_client, &client->read_requests);
+ }
+ else {
+ assert(list_is_empty(&client->read_requests));
+ assert(client->channel == c);
+ send_read_reply(client, token, size);
+ advance_stream_buffer(stream);
+ }
+ }
+ else {
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_stringz(&c->out, "null");
+ write_errno(&c->out, err);
+ json_write_long(&c->out, 0);
+ write_stream(&c->out, 0);
+ json_write_boolean(&c->out, 0);
+ write_stream(&c->out, 0);
+ write_stream(&c->out, MARKER_EOM);
+ }
+}
+
+static void command_write(char * token, Channel * c) {
+ char id[256];
+ StreamClient * client = NULL;
+ long size = 0;
+ long offs = 0;
+ char * data = NULL;
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ size = json_read_long(&c->inp);
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+
+ client = find_client(id, c);
+ if (client == NULL) err = errno;
+ if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED;
+
+ {
+ JsonReadBinaryState state;
+ size_t data_pos = 0;
+
+ if (!err && !list_is_empty(&client->write_requests)) data = (char *)loc_alloc(size);
+
+ json_read_binary_start(&state, &c->inp);
+ for (;;) {
+ if (data != NULL) {
+ size_t rd = json_read_binary_data(&state, data + data_pos, size - offs - data_pos);
+ if (rd == 0) break;
+ data_pos += rd;
+ }
+ else {
+ char buf[256];
+ size_t rd = json_read_binary_data(&state, buf, sizeof(buf));
+ if (rd == 0) break;
+ if (!err) {
+ size_t done = 0;
+ if (virtual_stream_add_data(client->stream, buf, rd, &done, 0) < 0) err = errno;
+ assert(done <= rd);
+ offs += done;
+ if (!err && done < rd) {
+ data = (char *)loc_alloc(size - offs);
+ memcpy(data, buf + done, rd - done);
+ data_pos = rd - done;
+ }
+ }
+ }
+ }
+ json_read_binary_end(&state);
+ }
+
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ if (data != NULL) {
+ WriteRequest * r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest));
+ list_init(&r->link_client);
+ r->client = client;
+ r->data = data;
+ r->size = size - offs;
+ strlcpy(r->token, token, sizeof(r->token));
+ list_add_last(&r->link_client, &client->write_requests);
+ }
+ else {
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+ }
+}
+
+static void command_eos(char * token, Channel * c) {
+ char id[256];
+ StreamClient * client = NULL;
+ size_t done = 0;
+ WriteRequest * r = NULL;
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ client = find_client(id, c);
+ if (client == NULL) err = errno;
+ if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED;
+ if (!err && !list_is_empty(&client->write_requests)) r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest));
+ if (!err && r == NULL && virtual_stream_add_data(client->stream, NULL, 0, &done, 1) < 0) err = errno;
+
+ if (r != NULL) {
+ list_init(&r->link_client);
+ r->client = client;
+ r->eos = 1;
+ strlcpy(r->token, token, sizeof(r->token));
+ list_add_last(&r->link_client, &client->write_requests);
+ }
+ else {
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+ }
+}
+
+static void command_connect(char * token, Channel * c) {
+ char id[256];
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ if (find_client(id, c) == NULL) {
+ VirtualStream * stream = virtual_stream_find(id);
+ if (stream == NULL) err = errno;
+ else create_client(stream, c);
+ }
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+}
+
+static void command_disconnect(char * token, Channel * c) {
+ char id[256];
+ StreamClient * client = NULL;
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
+ if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
+
+ client = find_client(id, c);
+ if (client == NULL) err = errno;
+ if (!err) delete_client(client);
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+}
+
+static void channel_close_listener(Channel * c) {
+ LINK * l = NULL;
+
+ for (l = clients.next; l != &clients;) {
+ StreamClient * client = all2client(l);
+ l = l->next;
+ if (client->channel == c) {
+ trace(LOG_ALWAYS, "Stream is left connected by client: VS%d", client->stream->id);
+ delete_client(client);
+ }
+ }
+
+ for (l = subscriptions.next; l != &subscriptions;) {
+ Subscription * h = all2subscription(l);
+ l = l->next;
+ if (h->channel == c) {
+ delete_subscription(h);
+ }
+ }
+}
+
+void ini_streams_service(Protocol * proto) {
+ int i;
+
+ list_init(&clients);
+ list_init(&streams);
+ list_init(&subscriptions);
+ for (i = 0; i < HANDLE_HASH_SIZE; i++) {
+ list_init(&handle_hash[i]);
+ }
+
+ add_channel_close_listener(channel_close_listener);
+
+ add_command_handler(proto, STREAMS, "subscribe", command_subscribe);
+ add_command_handler(proto, STREAMS, "unsubscribe", command_unsubscribe);
+ add_command_handler(proto, STREAMS, "read", command_read);
+ add_command_handler(proto, STREAMS, "write", command_write);
+ add_command_handler(proto, STREAMS, "eos", command_eos);
+ add_command_handler(proto, STREAMS, "connect", command_connect);
+ add_command_handler(proto, STREAMS, "disconnect", command_disconnect);
+}
+
+#endif /* SERVICE_Streams */

Back to the top