diff options
author | Eugene Tarassov | 2011-11-11 01:19:02 +0000 |
---|---|---|
committer | Eugene Tarassov | 2011-11-11 01:19:02 +0000 |
commit | a107edc216f94c5ec059fdf74541908bc7e979bb (patch) | |
tree | a0d9a448043cf5b969c2ede059b7d19a3aa64b71 /services/streamsservice.c | |
parent | 07c7a005cded9e4b4ec842d756d0e4d79dade149 (diff) | |
download | org.eclipse.tcf.agent-a107edc216f94c5ec059fdf74541908bc7e979bb.tar.gz org.eclipse.tcf.agent-a107edc216f94c5ec059fdf74541908bc7e979bb.tar.xz org.eclipse.tcf.agent-a107edc216f94c5ec059fdf74541908bc7e979bb.zip |
Agent code is moved into separate "agent" directory, all C code moved into "tcf" directory.
Diffstat (limited to 'services/streamsservice.c')
-rw-r--r-- | services/streamsservice.c | 856 |
1 files changed, 0 insertions, 856 deletions
diff --git a/services/streamsservice.c b/services/streamsservice.c deleted file mode 100644 index 0414e5d3..00000000 --- a/services/streamsservice.c +++ /dev/null @@ -1,856 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2009, 2010 Wind River Systems, Inc. and others. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * You may elect to redistribute this code under either of these licenses. - * - * Contributors: - * Wind River Systems - initial API and implementation - *******************************************************************************/ - -/* - * TCF Streams - generic streams service. - */ - -#include <config.h> - -#if SERVICE_Streams - -#include <assert.h> -#include <string.h> -#include <framework/channel.h> -#include <framework/exceptions.h> -#include <framework/json.h> -#include <framework/errors.h> -#include <framework/trace.h> -#include <framework/events.h> -#include <framework/myalloc.h> -#include <services/streamsservice.h> - -static const char * STREAMS = "Streams"; - -typedef struct StreamClient StreamClient; -typedef struct ReadRequest ReadRequest; -typedef struct WriteRequest WriteRequest; -typedef struct Subscription Subscription; - -#define STREAM_MAGIC 0x29465398 - -struct VirtualStream { - LINK link_all; - unsigned magic; - char type[256]; - unsigned id; - unsigned access; - VirtualStreamCallBack * callback; - void * callback_args; - int ref_cnt; - int deleted; - LINK clients; - uint64_t pos; - char * buf; - size_t buf_len; - size_t buf_inp; - size_t buf_out; - unsigned eos_inp; - unsigned eos_out; - unsigned data_available_posted; - unsigned space_available_posted; -}; - -struct StreamClient { - LINK link_hash; - LINK link_stream; - LINK link_all; - LINK read_requests; - LINK write_requests; - VirtualStream * stream; - Channel * channel; - uint64_t pos; -}; - -struct ReadRequest { - LINK link_client; - StreamClient * client; - char token[256]; - size_t size; -}; - -struct WriteRequest { - LINK link_client; - StreamClient * client; - char token[256]; - char * data; - size_t offs; - size_t size; - int eos; -}; - -struct Subscription { - LINK link_all; - char type[256]; - Channel * channel; -}; - -#define hash2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_hash))) -#define stream2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_stream))) -#define all2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_all))) -#define all2subscription(A) ((Subscription *)((char *)(A) - offsetof(Subscription, link_all))) -#define all2stream(A) ((VirtualStream *)((char *)(A) - offsetof(VirtualStream, link_all))) -#define client2read_request(A) ((ReadRequest *)((char *)(A) - offsetof(ReadRequest, link_client))) -#define client2write_request(A) ((WriteRequest *)((char *)(A) - offsetof(WriteRequest, link_client))) - -#define HANDLE_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1) -static LINK handle_hash[HANDLE_HASH_SIZE]; -static LINK clients; -static LINK streams; -static LINK subscriptions; -static unsigned id_cnt = 0; - -static unsigned get_client_hash(unsigned id, Channel * c) { - return (id + (unsigned)(uintptr_t)c) % HANDLE_HASH_SIZE; -} - -static int str2id(char * s, unsigned * id) { - char * p = NULL; - if (*s++ != 'V') return 0; - if (*s++ != 'S') return 0; - *id = (unsigned)strtoul(s, &p, 10); - return *p == 0; -} - -void virtual_stream_get_id(VirtualStream * stream, char * id_buf, size_t buf_size) { - snprintf(id_buf, buf_size, "VS%u", stream->id); -} - -static StreamClient * find_client(char * s, Channel * c) { - unsigned id = 0; - if (str2id(s, &id)) { - unsigned h = get_client_hash(id, c); - LINK * l = handle_hash[h].next; - while (l != &handle_hash[h]) { - StreamClient * client = hash2client(l); - if (client->stream->id == id && client->channel == c) return client; - l = l->next; - } - } - errno = set_fmt_errno(ERR_OTHER, "No such stream: %s", s); - return NULL; -} - -static void send_event_stream_created(OutputStream * out, VirtualStream * stream, const char * context_id) { - char id[256]; - - virtual_stream_get_id(stream, id, sizeof(id)); - write_stringz(out, "E"); - write_stringz(out, STREAMS); - write_stringz(out, "created"); - - json_write_string(out, stream->type); - write_stream(out, 0); - json_write_string(out, id); - write_stream(out, 0); - json_write_string(out, context_id); - write_stream(out, 0); - write_stream(out, MARKER_EOM); -} - -static void send_event_stream_disposed(OutputStream * out, VirtualStream * stream) { - char id[256]; - - virtual_stream_get_id(stream, id, sizeof(id)); - write_stringz(out, "E"); - write_stringz(out, STREAMS); - write_stringz(out, "disposed"); - - json_write_string(out, stream->type); - write_stream(out, 0); - json_write_string(out, id); - write_stream(out, 0); - write_stream(out, MARKER_EOM); -} - -static void delete_read_request(ReadRequest * r) { - Channel * c = r->client->channel; - Trap trap; - - if (set_trap(&trap)) { - write_stringz(&c->out, "R"); - write_stringz(&c->out, r->token); - write_stringz(&c->out, "null"); - write_errno(&c->out, ERR_COMMAND_CANCELLED); - json_write_long(&c->out, 0); - write_stream(&c->out, 0); - json_write_boolean(&c->out, 1); - write_stream(&c->out, 0); - write_stream(&c->out, MARKER_EOM); - clear_trap(&trap); - } - else { - trace(LOG_ALWAYS, "Exception handling pending stream read command: %d %s", - trap.error, errno_to_str(trap.error)); - } - list_remove(&r->link_client); - loc_free(r); -} - -static void delete_write_request(WriteRequest * r, int error) { - Channel * c = r->client->channel; - Trap trap; - - if (set_trap(&trap)) { - write_stringz(&c->out, "R"); - write_stringz(&c->out, r->token); - write_errno(&c->out, error); - write_stream(&c->out, MARKER_EOM); - clear_trap(&trap); - } - else { - trace(LOG_ALWAYS, "Exception handling pending stream write command: %d %s", - trap.error, errno_to_str(trap.error)); - } - list_remove(&r->link_client); - loc_free(r->data); - loc_free(r); -} - -static void delete_stream(void * args) { - VirtualStream * stream = (VirtualStream *)args; - - assert(stream->magic == STREAM_MAGIC); - assert(list_is_empty(&stream->clients)); - assert(stream->deleted); - stream->magic = 0; - list_remove(&stream->link_all); - loc_free(stream->buf); - loc_free(stream); -} - -static void notify_data_available(void * args) { - VirtualStream * stream = (VirtualStream *)args; - assert(stream->magic == STREAM_MAGIC); - stream->data_available_posted = 0; - if (stream->deleted || stream->eos_out) return; - stream->callback(stream, VS_EVENT_DATA_AVAILABLE, stream->callback_args); -} - -static void notify_space_available(void * args) { - VirtualStream * stream = (VirtualStream *)args; - assert(stream->magic == STREAM_MAGIC); - stream->space_available_posted = 0; - if (stream->deleted || stream->eos_inp) return; - stream->callback(stream, VS_EVENT_SPACE_AVAILABLE, stream->callback_args); -} - -static void advance_stream_buffer(VirtualStream * stream) { - size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; - uint64_t min_pos = ~(uint64_t)0; - uint64_t buf_pos = stream->pos - len; - LINK * l; - - assert(stream->access & VS_ENABLE_REMOTE_READ); - for (l = stream->clients.next; l != &stream->clients; l = l->next) { - StreamClient * client = stream2client(l); - assert(client->pos <= stream->pos); - if (client->pos < min_pos) min_pos = client->pos; - } - if (min_pos == ~(uint64_t)0) { - stream->buf_out = stream->buf_inp; - } - else if (min_pos > buf_pos) { - assert(min_pos - buf_pos <= len); - stream->buf_out = (stream->buf_out + (unsigned)(min_pos - buf_pos)) % stream->buf_len; - } - else if (stream->pos - min_pos >= stream->buf_len) { - /* TODO: drop stream data */ - assert(0); - } - if (len != (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len && - !stream->space_available_posted) { - post_event(notify_space_available, stream); - stream->space_available_posted = 1; - } -} - -static StreamClient * create_client(VirtualStream * stream, Channel * channel) { - size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; - StreamClient * client = (StreamClient *)loc_alloc_zero(sizeof(StreamClient)); - list_init(&client->link_hash); - list_init(&client->link_stream); - list_init(&client->link_all); - list_init(&client->read_requests); - list_init(&client->write_requests); - client->stream = stream; - client->channel = channel; - client->pos = stream->pos - len; - list_add_first(&client->link_hash, &handle_hash[get_client_hash(stream->id, channel)]); - list_add_first(&client->link_stream, &stream->clients); - list_add_first(&client->link_all, &clients); - stream->ref_cnt++; - return client; -} - -static void delete_client(StreamClient * client) { - VirtualStream * stream = client->stream; - Trap trap; - LINK * n; - - assert(stream->ref_cnt > 0); - if (set_trap(&trap)) { - send_event_stream_disposed(&client->channel->out, stream); - clear_trap(&trap); - } - else { - trace(LOG_ALWAYS, "Exception sending stream deleted event: %d %s", - trap.error, errno_to_str(trap.error)); - } - list_remove(&client->link_hash); - list_remove(&client->link_stream); - list_remove(&client->link_all); - for (n = client->read_requests.next; n != &client->read_requests;) { - ReadRequest * r = client2read_request(n); - n = n->next; - delete_read_request(r); - } - for (n = client->write_requests.next; n != &client->write_requests;) { - WriteRequest * r = client2write_request(n); - n = n->next; - delete_write_request(r, ERR_COMMAND_CANCELLED); - } - loc_free(client); - if (--stream->ref_cnt == 0) { - assert(list_is_empty(&stream->clients)); - assert(stream->deleted); - post_event(delete_stream, stream); - } - else if (stream->access & VS_ENABLE_REMOTE_READ) { - advance_stream_buffer(stream); - } -} - -static void delete_subscription(Subscription * s) { - list_remove(&s->link_all); - loc_free(s); -} - -static void send_read_reply(StreamClient * client, char * token, size_t size) { - VirtualStream * stream = client->stream; - Channel * c = client->channel; - size_t lost = 0; - size_t read1 = 0; - size_t read2 = 0; - int eos = 0; - char * data1 = NULL; - char * data2 = NULL; - size_t pos; - size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; - - assert(len > 0 || stream->eos_inp); - assert(client->pos <= stream->pos); - if ((uint64_t)len < stream->pos - client->pos) { - lost = (long)(stream->pos - client->pos - len); - } - else { - len = (unsigned)(stream->pos - client->pos); - } - pos = (stream->buf_inp + stream->buf_len - len) % stream->buf_len; - if (len > size) len = size; - data1 = stream->buf + pos; - if (pos + len <= stream->buf_len) { - read1 = len; - } - else { - read1 = stream->buf_len - pos; - data2 = stream->buf; - read2 = len - read1; - } - assert(read1 + read2 == len); - client->pos += lost + read1 + read2; - assert(client->pos <= stream->pos); - if (client->pos == stream->pos && stream->eos_inp) eos = 1; - assert(eos || lost + read1 + read2 > 0); - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - if (read1 + read2 > 0) { - JsonWriteBinaryState state; - - json_write_binary_start(&state, &c->out, read1 + read2); - json_write_binary_data(&state, data1, read1); - json_write_binary_data(&state, data2, read2); - json_write_binary_end(&state); - write_stream(&c->out, 0); - } - else { - write_stringz(&c->out, "null"); - } - write_errno(&c->out, 0); - json_write_long(&c->out, lost); - write_stream(&c->out, 0); - json_write_boolean(&c->out, eos); - write_stream(&c->out, 0); - write_stream(&c->out, MARKER_EOM); -} - -void virtual_stream_create(const char * type, const char * context_id, size_t buf_len, unsigned access, - VirtualStreamCallBack * callback, void * callback_args, VirtualStream ** res) { - LINK * l; - VirtualStream * stream = (VirtualStream *)loc_alloc_zero(sizeof(VirtualStream)); - - buf_len++; - list_init(&stream->clients); - strlcpy(stream->type, type, sizeof(stream->type)); - stream->magic = STREAM_MAGIC; - stream->id = id_cnt++; - stream->access = access; - stream->callback = callback; - stream->callback_args = callback_args; - stream->ref_cnt = 1; - stream->buf = (char *)loc_alloc(buf_len); - stream->buf_len = buf_len; - for (l = subscriptions.next; l != &subscriptions; l = l->next) { - Subscription * h = all2subscription(l); - if (strcmp(type, h->type) == 0) { - Trap trap; - create_client(stream, h->channel); - if (set_trap(&trap)) { - send_event_stream_created(&h->channel->out, stream, context_id); - clear_trap(&trap); - } - else { - trace(LOG_ALWAYS, "Exception sending stream created event: %d %s", - trap.error, errno_to_str(trap.error)); - } - } - } - list_add_first(&stream->link_all, &streams); - *res = stream; -} - -VirtualStream * virtual_stream_find(char * id) { - LINK * l; - unsigned n = 0; - - if (str2id(id, &n)) { - for (l = streams.next; l != &streams; l = l->next) { - VirtualStream * stream = all2stream(l); - if (stream->id == n && !stream->deleted) return stream; - } - } - errno = ERR_INV_CONTEXT; - return NULL; -} - -int virtual_stream_add_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int eos) { - int err = 0; - - assert(stream->magic == STREAM_MAGIC); - if (stream->eos_inp) err = ERR_EOF; - - if (!err) { - size_t len = (stream->buf_out + stream->buf_len - stream->buf_inp - 1) % stream->buf_len; - assert(len < stream->buf_len); - if (buf_size < len) len = buf_size; - if (stream->buf_inp + len <= stream->buf_len) { - memcpy(stream->buf + stream->buf_inp, buf, len); - } - else { - size_t x = stream->buf_len - stream->buf_inp; - size_t y = len - x; - memcpy(stream->buf + stream->buf_inp, buf, x); - memcpy(stream->buf, buf + x, y); - } - stream->buf_inp = (stream->buf_inp + len) % stream->buf_len; - stream->pos += len; - *data_size = len; - if (eos && buf_size == len) stream->eos_inp = 1; - } - - if (stream->access & VS_ENABLE_REMOTE_READ) { - if (!err && (stream->eos_inp || *data_size > 0)) { - LINK * l; - for (l = stream->clients.next; l != &stream->clients; l = l->next) { - StreamClient * client = stream2client(l); - while (!list_is_empty(&client->read_requests) && (client->pos < stream->pos || stream->eos_inp)) { - ReadRequest * r = client2read_request(client->read_requests.next); - list_remove(&r->link_client); - send_read_reply(client, r->token, r->size); - loc_free(r); - } - } - advance_stream_buffer(stream); - } - } - else if (!stream->data_available_posted) { - post_event(notify_data_available, stream); - stream->data_available_posted = 1; - } - - errno = err; - return err ? -1 : 0; -} - -int virtual_stream_get_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int * eos) { - size_t len; - - assert(stream->magic == STREAM_MAGIC); - len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; - - if (len > buf_size) { - len = buf_size; - *eos = 0; - } - else { - *eos = stream->eos_inp; - } - *data_size = len; - if (*eos) stream->eos_out = 1; - if (stream->buf_out + len <= stream->buf_len) { - memcpy(buf, stream->buf + stream->buf_out, len); - } - else { - size_t x = stream->buf_len - stream->buf_out; - size_t y = len - x; - memcpy(buf, stream->buf + stream->buf_out, x); - memcpy(buf + x, stream->buf, y); - } - if (stream->access & VS_ENABLE_REMOTE_WRITE) { - LINK * l; - for (l = stream->clients.next; l != &stream->clients; l = l->next) { - StreamClient * client = stream2client(l); - if (!list_is_empty(&client->write_requests)) { - WriteRequest * r = client2write_request(client->write_requests.next); - size_t done = 0; - int error = 0; - if (virtual_stream_add_data(client->stream, r->data + r->offs, - r->size - r->offs, &done, r->eos) < 0) error = errno; - r->offs += done; - if (error || r->offs >= r->size) { - delete_write_request(r, error); - } - while (error && !list_is_empty(&client->write_requests)) { - r = client2write_request(client->write_requests.next); - delete_write_request(r, ERR_COMMAND_CANCELLED); - } - } - } - } - if ((stream->access & VS_ENABLE_REMOTE_READ) == 0 && len > 0) { - stream->buf_out = (stream->buf_out + len) % stream->buf_len; - assert(!*eos || stream->buf_out == stream->buf_inp); - if (!stream->space_available_posted) { - post_event(notify_space_available, stream); - stream->space_available_posted = 1; - } - } - return 0; -} - -int virtual_stream_is_empty(VirtualStream * stream) { - assert(stream->magic == STREAM_MAGIC); - assert(!stream->deleted); - return stream->buf_out == stream->buf_inp; -} - -void virtual_stream_delete(VirtualStream * stream) { - assert(stream->magic == STREAM_MAGIC); - assert(!stream->deleted); - stream->deleted = 1; - if (--stream->ref_cnt > 0) return; - assert(list_is_empty(&stream->clients)); - post_event(delete_stream, stream); -} - -static void command_subscribe(char * token, Channel * c) { - char type[256]; - int err = 0; - LINK * l; - - json_read_string(&c->inp, type, sizeof(type)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - for (l = subscriptions.next; l != &subscriptions;) { - Subscription * h = all2subscription(l); - l = l->next; - if (h->channel == c && strcmp(type, h->type) == 0) { - err = set_errno(ERR_OTHER, "Already subscribed"); - break; - } - } - if (err == 0) { - Subscription * s = (Subscription *)loc_alloc_zero(sizeof(Subscription)); - list_init(&s->link_all); - list_add_first(&s->link_all, &subscriptions); - strlcpy(s->type, type, sizeof(s->type)); - s->channel = c; - } - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); -} - -static void command_unsubscribe(char * token, Channel * c) { - char type[256]; - int err = 0; - Subscription * s = NULL; - LINK * l; - - json_read_string(&c->inp, type, sizeof(type)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - for (l = subscriptions.next; l != &subscriptions;) { - Subscription * h = all2subscription(l); - l = l->next; - if (h->channel == c && strcmp(type, h->type) == 0) { - s = h; - break; - } - } - if (s == NULL) err = ERR_INV_CONTEXT; - if (err == 0) delete_subscription(s); - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); -} - -static void command_read(char * token, Channel * c) { - char id[256]; - size_t size = 0; - StreamClient * client = NULL; - int err = 0; - - json_read_string(&c->inp, id, sizeof(id)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - size = json_read_ulong(&c->inp); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - client = find_client(id, c); - if (client == NULL) err = errno; - if (!err && (client->stream->access & VS_ENABLE_REMOTE_READ) == 0) err = ERR_UNSUPPORTED; - - if (err == 0) { - VirtualStream * stream = client->stream; - if (client->pos == stream->pos && !stream->eos_inp) { - ReadRequest * r = (ReadRequest *)loc_alloc_zero(sizeof(ReadRequest)); - list_init(&r->link_client); - r->client = client; - r->size = size; - strlcpy(r->token, token, sizeof(r->token)); - list_add_last(&r->link_client, &client->read_requests); - } - else { - assert(list_is_empty(&client->read_requests)); - assert(client->channel == c); - send_read_reply(client, token, size); - advance_stream_buffer(stream); - } - } - else { - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_stringz(&c->out, "null"); - write_errno(&c->out, err); - json_write_long(&c->out, 0); - write_stream(&c->out, 0); - json_write_boolean(&c->out, 0); - write_stream(&c->out, 0); - write_stream(&c->out, MARKER_EOM); - } -} - -static void command_write(char * token, Channel * c) { - char id[256]; - StreamClient * client = NULL; - long size = 0; - long offs = 0; - char * data = NULL; - int err = 0; - - json_read_string(&c->inp, id, sizeof(id)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - size = json_read_long(&c->inp); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - - client = find_client(id, c); - if (client == NULL) err = errno; - if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; - - { - JsonReadBinaryState state; - size_t data_pos = 0; - - if (!err && !list_is_empty(&client->write_requests)) data = (char *)loc_alloc(size); - - json_read_binary_start(&state, &c->inp); - for (;;) { - if (data != NULL) { - size_t rd = json_read_binary_data(&state, data + data_pos, size - offs - data_pos); - if (rd == 0) break; - data_pos += rd; - } - else { - char buf[256]; - size_t rd = json_read_binary_data(&state, buf, sizeof(buf)); - if (rd == 0) break; - if (!err) { - size_t done = 0; - if (virtual_stream_add_data(client->stream, buf, rd, &done, 0) < 0) err = errno; - assert(done <= rd); - offs += done; - if (!err && done < rd) { - data = (char *)loc_alloc(size - offs); - memcpy(data, buf + done, rd - done); - data_pos = rd - done; - } - } - } - } - json_read_binary_end(&state); - } - - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - if (data != NULL) { - WriteRequest * r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); - list_init(&r->link_client); - r->client = client; - r->data = data; - r->size = size - offs; - strlcpy(r->token, token, sizeof(r->token)); - list_add_last(&r->link_client, &client->write_requests); - } - else { - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); - } -} - -static void command_eos(char * token, Channel * c) { - char id[256]; - StreamClient * client = NULL; - size_t done = 0; - WriteRequest * r = NULL; - int err = 0; - - json_read_string(&c->inp, id, sizeof(id)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - client = find_client(id, c); - if (client == NULL) err = errno; - if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; - if (!err && !list_is_empty(&client->write_requests)) r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); - if (!err && r == NULL && virtual_stream_add_data(client->stream, NULL, 0, &done, 1) < 0) err = errno; - - if (r != NULL) { - list_init(&r->link_client); - r->client = client; - r->eos = 1; - strlcpy(r->token, token, sizeof(r->token)); - list_add_last(&r->link_client, &client->write_requests); - } - else { - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); - } -} - -static void command_connect(char * token, Channel * c) { - char id[256]; - int err = 0; - - json_read_string(&c->inp, id, sizeof(id)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - if (find_client(id, c) == NULL) { - VirtualStream * stream = virtual_stream_find(id); - if (stream == NULL) err = errno; - else create_client(stream, c); - } - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); -} - -static void command_disconnect(char * token, Channel * c) { - char id[256]; - StreamClient * client = NULL; - int err = 0; - - json_read_string(&c->inp, id, sizeof(id)); - if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); - if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - - client = find_client(id, c); - if (client == NULL) err = errno; - if (!err) delete_client(client); - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); -} - -static void channel_close_listener(Channel * c) { - LINK * l = NULL; - - for (l = clients.next; l != &clients;) { - StreamClient * client = all2client(l); - l = l->next; - if (client->channel == c) { - trace(LOG_ALWAYS, "Stream is left connected by client: VS%d", client->stream->id); - delete_client(client); - } - } - - for (l = subscriptions.next; l != &subscriptions;) { - Subscription * h = all2subscription(l); - l = l->next; - if (h->channel == c) { - delete_subscription(h); - } - } -} - -void ini_streams_service(Protocol * proto) { - int i; - - list_init(&clients); - list_init(&streams); - list_init(&subscriptions); - for (i = 0; i < HANDLE_HASH_SIZE; i++) { - list_init(&handle_hash[i]); - } - - add_channel_close_listener(channel_close_listener); - - add_command_handler(proto, STREAMS, "subscribe", command_subscribe); - add_command_handler(proto, STREAMS, "unsubscribe", command_unsubscribe); - add_command_handler(proto, STREAMS, "read", command_read); - add_command_handler(proto, STREAMS, "write", command_write); - add_command_handler(proto, STREAMS, "eos", command_eos); - add_command_handler(proto, STREAMS, "connect", command_connect); - add_command_handler(proto, STREAMS, "disconnect", command_disconnect); -} - -#endif /* SERVICE_Streams */ |