Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--agent/tcf/services/streamsservice.c195
-rw-r--r--agent/tcf/services/streamsservice.h8
2 files changed, 149 insertions, 54 deletions
diff --git a/agent/tcf/services/streamsservice.c b/agent/tcf/services/streamsservice.c
index ea34a715..f295ccdf 100644
--- a/agent/tcf/services/streamsservice.c
+++ b/agent/tcf/services/streamsservice.c
@@ -572,14 +572,9 @@ void virtual_stream_delete(VirtualStream * stream) {
post_event(delete_stream, stream);
}
-static void command_subscribe(char * token, Channel * c) {
- char type[256];
- int err = 0;
+int virtual_stream_subscribe (Channel * c, const char * type) {
LINK * l;
-
- json_read_string(&c->inp, type, sizeof(type));
- json_test_char(&c->inp, MARKER_EOA);
- json_test_char(&c->inp, MARKER_EOM);
+ int err = 0;
for (l = subscriptions.next; l != &subscriptions;) {
Subscription * h = all2subscription(l);
@@ -596,23 +591,32 @@ static void command_subscribe(char * token, Channel * c) {
strlcpy(s->type, type, sizeof(s->type));
s->channel = c;
}
+ else errno = err;
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
- write_errno(&c->out, err);
- write_stream(&c->out, MARKER_EOM);
+ return err == 0 ? 0 : -1;
}
-static void command_unsubscribe(char * token, Channel * c) {
+static void command_subscribe(char * token, Channel * c) {
char type[256];
int err = 0;
- Subscription * s = NULL;
- LINK * l;
json_read_string(&c->inp, type, sizeof(type));
json_test_char(&c->inp, MARKER_EOA);
json_test_char(&c->inp, MARKER_EOM);
+ if (virtual_stream_subscribe(c, type) < 0) err = errno;
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+}
+
+int virtual_stream_unsubscribe (Channel * c, const char * type) {
+ LINK * l;
+ int err = 0;
+ Subscription * s = NULL;
+
for (l = subscriptions.next; l != &subscriptions;) {
Subscription * h = all2subscription(l);
l = l->next;
@@ -623,6 +627,20 @@ static void command_unsubscribe(char * token, Channel * c) {
}
if (s == NULL) err = ERR_INV_CONTEXT;
if (err == 0) delete_subscription(s);
+ else errno = err;
+
+ return err == 0 ? 0 : -1;
+}
+
+static void command_unsubscribe(char * token, Channel * c) {
+ char type[256];
+ int err = 0;
+
+ json_read_string(&c->inp, type, sizeof(type));
+ json_test_char(&c->inp, MARKER_EOA);
+ json_test_char(&c->inp, MARKER_EOM);
+
+ if (virtual_stream_unsubscribe(c, type) < 0) err = errno;
write_stringz(&c->out, "R");
write_stringz(&c->out, token);
@@ -630,19 +648,10 @@ static void command_unsubscribe(char * token, Channel * c) {
write_stream(&c->out, MARKER_EOM);
}
-static void command_read(char * token, Channel * c) {
- char id[256];
- size_t size;
- StreamClient * client;
+int virtual_stream_read (Channel * c, char * token, char * id, size_t size) {
int err = 0;
+ StreamClient * client= find_client(id, c);
- json_read_string(&c->inp, id, sizeof(id));
- json_test_char(&c->inp, MARKER_EOA);
- size = json_read_ulong(&c->inp);
- json_test_char(&c->inp, MARKER_EOA);
- json_test_char(&c->inp, MARKER_EOM);
-
- client = find_client(id, c);
if (client == NULL) err = errno;
if (!err && (client->stream->access & VS_ENABLE_REMOTE_READ) == 0) err = ERR_UNSUPPORTED;
@@ -663,7 +672,30 @@ static void command_read(char * token, Channel * c) {
advance_stream_buffer(stream);
}
}
- else {
+ else errno = err;
+
+ return err == 0 ? 0 : -1;
+}
+
+static void command_read(char * token, Channel * c) {
+ char id[256];
+ size_t size;
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ json_test_char(&c->inp, MARKER_EOA);
+ size = json_read_ulong(&c->inp);
+ json_test_char(&c->inp, MARKER_EOA);
+ json_test_char(&c->inp, MARKER_EOM);
+
+ if (virtual_stream_read (c, token, id, size) < 0) err = errno;
+
+ if (err != 0) {
+ /*
+ * Handle reply with an error. If none error was detected, the reply
+ * was sent back by virtual_stream_read() or delayed.
+ */
+
write_stringz(&c->out, "R");
write_stringz(&c->out, token);
write_stringz(&c->out, "null");
@@ -676,20 +708,12 @@ static void command_read(char * token, Channel * c) {
}
}
-static void command_write(char * token, Channel * c) {
- char id[256];
- StreamClient * client;
- long size;
- long offs = 0;
+int virtual_stream_write (Channel * c, char * token, char * id, size_t size, InputStream * inp) {
char * data = NULL;
int err = 0;
+ long offs = 0;
+ StreamClient * client = find_client(id, c);
- json_read_string(&c->inp, id, sizeof(id));
- json_test_char(&c->inp, MARKER_EOA);
- size = json_read_long(&c->inp);
- json_test_char(&c->inp, MARKER_EOA);
-
- client = find_client(id, c);
if (client == NULL) err = errno;
if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED;
@@ -699,7 +723,7 @@ static void command_write(char * token, Channel * c) {
if (!err && !list_is_empty(&client->write_requests)) data = (char *)loc_alloc(size);
- json_read_binary_start(&state, &c->inp);
+ json_read_binary_start(&state, inp);
for (;;) {
if (data != NULL) {
size_t rd = json_read_binary_data(&state, data + data_pos, size - offs - data_pos);
@@ -726,9 +750,6 @@ static void command_write(char * token, Channel * c) {
json_read_binary_end(&state);
}
- json_test_char(&c->inp, MARKER_EOA);
- json_test_char(&c->inp, MARKER_EOM);
-
if (data != NULL) {
WriteRequest * r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest));
list_init(&r->link_client);
@@ -738,26 +759,51 @@ static void command_write(char * token, Channel * c) {
strlcpy(r->token, token, sizeof(r->token));
list_add_last(&r->link_client, &client->write_requests);
}
- else {
+ else if (err == 0) {
write_stringz(&c->out, "R");
write_stringz(&c->out, token);
write_errno(&c->out, err);
write_stream(&c->out, MARKER_EOM);
}
+
+ if (err != 0) errno = err;
+
+ return err == 0 ? 0 : -1;
}
-static void command_eos(char * token, Channel * c) {
+static void command_write(char * token, Channel * c) {
char id[256];
- StreamClient * client;
- size_t done = 0;
- WriteRequest * r = NULL;
+ long size;
int err = 0;
-
json_read_string(&c->inp, id, sizeof(id));
json_test_char(&c->inp, MARKER_EOA);
+ size = json_read_long(&c->inp);
+ json_test_char(&c->inp, MARKER_EOA);
+
+ if (virtual_stream_write(c, token, id, size, &c->inp) < 0) err = errno;
+
+ json_test_char(&c->inp, MARKER_EOA);
json_test_char(&c->inp, MARKER_EOM);
- client = find_client(id, c);
+ if (err != 0) {
+ /*
+ * Handle reply with an error. If none error was detected, the reply
+ * was sent back by virtual_stream_write() or delayed.
+ */
+
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+ }
+}
+
+int virtual_stream_eos (Channel * c, char * token, char * id) {
+ size_t done = 0;
+ WriteRequest * r = NULL;
+ int err = 0;
+ StreamClient * client = find_client(id, c);
+
if (client == NULL) err = errno;
if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED;
if (!err && !list_is_empty(&client->write_requests)) r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest));
@@ -770,15 +816,19 @@ static void command_eos(char * token, Channel * c) {
strlcpy(r->token, token, sizeof(r->token));
list_add_last(&r->link_client, &client->write_requests);
}
- else {
+ else if (err == 0) {
write_stringz(&c->out, "R");
write_stringz(&c->out, token);
write_errno(&c->out, err);
write_stream(&c->out, MARKER_EOM);
}
+
+ if (err != 0) errno = err;
+
+ return err == 0 ? 0 : -1;
}
-static void command_connect(char * token, Channel * c) {
+static void command_eos(char * token, Channel * c) {
char id[256];
int err = 0;
@@ -786,30 +836,67 @@ static void command_connect(char * token, Channel * c) {
json_test_char(&c->inp, MARKER_EOA);
json_test_char(&c->inp, MARKER_EOM);
+ if (virtual_stream_eos(c, token, id) < 0) err = errno;
+
+ if (err != 0) {
+ /*
+ * Handle reply with an error. If none error was detected, the reply
+ * was sent back by virtual_stream_eos() or delayed.
+ */
+ write_stringz(&c->out, "R");
+ write_stringz(&c->out, token);
+ write_errno(&c->out, err);
+ write_stream(&c->out, MARKER_EOM);
+ }
+}
+
+int virtual_stream_connect (Channel * c, char * token, char * id) {
+ int err = 0;
+
if (find_client(id, c) == NULL) {
VirtualStream * stream = virtual_stream_find(id);
if (stream == NULL) err = errno;
else create_client(stream, c);
}
+ return err == 0 ? 0 : -1;
+}
+
+static void command_connect(char * token, Channel * c) {
+ char id[256];
+ int err = 0;
+
+ json_read_string(&c->inp, id, sizeof(id));
+ json_test_char(&c->inp, MARKER_EOA);
+ json_test_char(&c->inp, MARKER_EOM);
+
+ if (virtual_stream_connect (c, token, id) < 0) err = errno;
+
write_stringz(&c->out, "R");
write_stringz(&c->out, token);
write_errno(&c->out, err);
write_stream(&c->out, MARKER_EOM);
}
+int virtual_stream_disconnect (Channel * c, char * token, char * id) {
+ int err = 0;
+ StreamClient * client = find_client(id, c);
+
+ if (client == NULL) err = errno;
+ if (!err) delete_client(client);
+
+ return err == 0 ? 0 : -1;
+}
+
static void command_disconnect(char * token, Channel * c) {
char id[256];
- StreamClient * client;
int err = 0;
json_read_string(&c->inp, id, sizeof(id));
json_test_char(&c->inp, MARKER_EOA);
json_test_char(&c->inp, MARKER_EOM);
- client = find_client(id, c);
- if (client == NULL) err = errno;
- if (!err) delete_client(client);
+ if (virtual_stream_disconnect (c, token, id) < 0) err = errno;
write_stringz(&c->out, "R");
write_stringz(&c->out, token);
diff --git a/agent/tcf/services/streamsservice.h b/agent/tcf/services/streamsservice.h
index a24f906d..f598a079 100644
--- a/agent/tcf/services/streamsservice.h
+++ b/agent/tcf/services/streamsservice.h
@@ -43,6 +43,14 @@ extern int virtual_stream_add_data(VirtualStream * stream, char * buf, size_t bu
extern int virtual_stream_get_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int * eos);
extern int virtual_stream_is_empty(VirtualStream * stream);
+extern int virtual_stream_eos (Channel * c, char * token, char * id);
+extern int virtual_stream_write (Channel * c, char * token, char * id, size_t size, InputStream * inp);
+extern int virtual_stream_read (Channel * c, char * token, char * id, size_t size);
+extern int virtual_stream_unsubscribe (Channel * c, const char * type);
+extern int virtual_stream_subscribe (Channel * c, const char * type);
+extern int virtual_stream_connect (Channel * c, char * token, char * id);
+extern int virtual_stream_disconnect (Channel * c, char * token, char * id);
+
/*
* Initialize streams service.
*/

Back to the top