/******************************************************************************* * Copyright (c) 2009, 2011 Wind River Systems, Inc. and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * You may elect to redistribute this code under either of these licenses. * * Contributors: * Wind River Systems - initial API and implementation *******************************************************************************/ /* * TCF Streams - generic streams service. */ #include #if SERVICE_Streams #include #include #include #include #include #include #include #include #include #include static const char * STREAMS = "Streams"; typedef struct StreamClient StreamClient; typedef struct ReadRequest ReadRequest; typedef struct WriteRequest WriteRequest; typedef struct Subscription Subscription; #define STREAM_MAGIC 0x29465398 struct VirtualStream { LINK link_all; unsigned magic; char type[256]; unsigned id; unsigned access; VirtualStreamCallBack * callback; void * callback_args; int ref_cnt; int deleted; LINK clients; uint64_t pos; char * buf; size_t buf_len; size_t buf_inp; size_t buf_out; unsigned eos_inp; unsigned eos_out; unsigned data_available_posted; unsigned space_available_posted; }; struct StreamClient { LINK link_hash; LINK link_stream; LINK link_all; LINK read_requests; LINK write_requests; VirtualStream * stream; Channel * channel; uint64_t pos; }; struct ReadRequest { LINK link_client; StreamClient * client; char token[256]; size_t size; }; struct WriteRequest { LINK link_client; StreamClient * client; char token[256]; char * data; size_t offs; size_t size; int eos; }; struct Subscription { LINK link_all; char type[256]; Channel * channel; }; #define hash2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_hash))) #define stream2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_stream))) #define all2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_all))) #define all2subscription(A) ((Subscription *)((char *)(A) - offsetof(Subscription, link_all))) #define all2stream(A) ((VirtualStream *)((char *)(A) - offsetof(VirtualStream, link_all))) #define client2read_request(A) ((ReadRequest *)((char *)(A) - offsetof(ReadRequest, link_client))) #define client2write_request(A) ((WriteRequest *)((char *)(A) - offsetof(WriteRequest, link_client))) #define HANDLE_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1) static LINK handle_hash[HANDLE_HASH_SIZE]; static LINK clients = TCF_LIST_INIT(clients); static LINK streams = TCF_LIST_INIT(streams); static LINK subscriptions = TCF_LIST_INIT(subscriptions); static unsigned id_cnt = 0; static unsigned get_client_hash(unsigned id, Channel * c) { return (id + (unsigned)(uintptr_t)c) % HANDLE_HASH_SIZE; } static int str2id(char * s, unsigned * id) { char * p = NULL; if (*s++ != 'V') return 0; if (*s++ != 'S') return 0; *id = (unsigned)strtoul(s, &p, 10); return *p == 0; } void virtual_stream_get_id(VirtualStream * stream, char * id_buf, size_t buf_size) { snprintf(id_buf, buf_size, "VS%u", stream->id); } static StreamClient * find_client(char * s, Channel * c) { unsigned id = 0; if (str2id(s, &id)) { unsigned h = get_client_hash(id, c); LINK * l = handle_hash[h].next; while (l != &handle_hash[h]) { StreamClient * client = hash2client(l); if (client->stream->id == id && client->channel == c) return client; l = l->next; } } errno = set_fmt_errno(ERR_OTHER, "No such stream: %s", s); return NULL; } static void send_event_stream_created(OutputStream * out, VirtualStream * stream, const char * context_id) { char id[256]; virtual_stream_get_id(stream, id, sizeof(id)); write_stringz(out, "E"); write_stringz(out, STREAMS); write_stringz(out, "created"); json_write_string(out, stream->type); write_stream(out, 0); json_write_string(out, id); write_stream(out, 0); json_write_string(out, context_id); write_stream(out, 0); write_stream(out, MARKER_EOM); } static void send_event_stream_disposed(OutputStream * out, VirtualStream * stream) { char id[256]; virtual_stream_get_id(stream, id, sizeof(id)); write_stringz(out, "E"); write_stringz(out, STREAMS); write_stringz(out, "disposed"); json_write_string(out, stream->type); write_stream(out, 0); json_write_string(out, id); write_stream(out, 0); write_stream(out, MARKER_EOM); } static void delete_read_request(ReadRequest * r) { Channel * c = r->client->channel; Trap trap; if (set_trap(&trap)) { write_stringz(&c->out, "R"); write_stringz(&c->out, r->token); write_stringz(&c->out, "null"); write_errno(&c->out, ERR_COMMAND_CANCELLED); json_write_long(&c->out, 0); write_stream(&c->out, 0); json_write_boolean(&c->out, 1); write_stream(&c->out, 0); write_stream(&c->out, MARKER_EOM); clear_trap(&trap); } else { trace(LOG_ALWAYS, "Exception handling pending stream read command: %d %s", trap.error, errno_to_str(trap.error)); } list_remove(&r->link_client); loc_free(r); } static void delete_write_request(WriteRequest * r, int error) { Channel * c = r->client->channel; Trap trap; if (set_trap(&trap)) { write_stringz(&c->out, "R"); write_stringz(&c->out, r->token); write_errno(&c->out, error); write_stream(&c->out, MARKER_EOM); clear_trap(&trap); } else { trace(LOG_ALWAYS, "Exception handling pending stream write command: %d %s", trap.error, errno_to_str(trap.error)); } list_remove(&r->link_client); loc_free(r->data); loc_free(r); } static void delete_stream(void * args) { VirtualStream * stream = (VirtualStream *)args; assert(stream->magic == STREAM_MAGIC); assert(list_is_empty(&stream->clients)); assert(stream->deleted); stream->magic = 0; list_remove(&stream->link_all); loc_free(stream->buf); loc_free(stream); } static void notify_data_available(void * args) { VirtualStream * stream = (VirtualStream *)args; assert(stream->magic == STREAM_MAGIC); stream->data_available_posted = 0; if (stream->deleted || stream->eos_out) return; stream->callback(stream, VS_EVENT_DATA_AVAILABLE, stream->callback_args); } static void notify_space_available(void * args) { VirtualStream * stream = (VirtualStream *)args; assert(stream->magic == STREAM_MAGIC); stream->space_available_posted = 0; if (stream->deleted || stream->eos_inp) return; stream->callback(stream, VS_EVENT_SPACE_AVAILABLE, stream->callback_args); } static void advance_stream_buffer(VirtualStream * stream) { size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; uint64_t min_pos = ~(uint64_t)0; uint64_t buf_pos = stream->pos - len; LINK * l; assert(stream->access & VS_ENABLE_REMOTE_READ); for (l = stream->clients.next; l != &stream->clients; l = l->next) { StreamClient * client = stream2client(l); assert(client->pos <= stream->pos); if (client->pos < min_pos) min_pos = client->pos; } if (min_pos == ~(uint64_t)0) { stream->buf_out = stream->buf_inp; } else if (min_pos > buf_pos) { assert(min_pos - buf_pos <= len); stream->buf_out = (stream->buf_out + (unsigned)(min_pos - buf_pos)) % stream->buf_len; } else if (stream->pos - min_pos >= stream->buf_len) { /* TODO: drop stream data */ assert(0); } if (len != (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len && !stream->space_available_posted) { post_event(notify_space_available, stream); stream->space_available_posted = 1; } } static StreamClient * create_client(VirtualStream * stream, Channel * channel) { size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; StreamClient * client = (StreamClient *)loc_alloc_zero(sizeof(StreamClient)); list_init(&client->link_hash); list_init(&client->link_stream); list_init(&client->link_all); list_init(&client->read_requests); list_init(&client->write_requests); client->stream = stream; client->channel = channel; client->pos = stream->pos - len; list_add_first(&client->link_hash, &handle_hash[get_client_hash(stream->id, channel)]); list_add_first(&client->link_stream, &stream->clients); list_add_first(&client->link_all, &clients); stream->ref_cnt++; return client; } static void delete_client(StreamClient * client) { VirtualStream * stream = client->stream; Trap trap; LINK * n; assert(stream->ref_cnt > 0); if (set_trap(&trap)) { send_event_stream_disposed(&client->channel->out, stream); clear_trap(&trap); } else { trace(LOG_ALWAYS, "Exception sending stream deleted event: %d %s", trap.error, errno_to_str(trap.error)); } list_remove(&client->link_hash); list_remove(&client->link_stream); list_remove(&client->link_all); for (n = client->read_requests.next; n != &client->read_requests;) { ReadRequest * r = client2read_request(n); n = n->next; delete_read_request(r); } for (n = client->write_requests.next; n != &client->write_requests;) { WriteRequest * r = client2write_request(n); n = n->next; delete_write_request(r, ERR_COMMAND_CANCELLED); } loc_free(client); if (--stream->ref_cnt == 0) { assert(list_is_empty(&stream->clients)); assert(stream->deleted); post_event(delete_stream, stream); } else if (stream->access & VS_ENABLE_REMOTE_READ) { advance_stream_buffer(stream); } } static void delete_subscription(Subscription * s) { list_remove(&s->link_all); loc_free(s); } static void send_read_reply(StreamClient * client, char * token, size_t size) { VirtualStream * stream = client->stream; Channel * c = client->channel; size_t lost = 0; size_t read1 = 0; size_t read2 = 0; int eos = 0; char * data1 = NULL; char * data2 = NULL; size_t pos; size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; assert(len > 0 || stream->eos_inp); assert(client->pos <= stream->pos); if ((uint64_t)len < stream->pos - client->pos) { lost = (long)(stream->pos - client->pos - len); } else { len = (unsigned)(stream->pos - client->pos); } pos = (stream->buf_inp + stream->buf_len - len) % stream->buf_len; if (len > size) len = size; data1 = stream->buf + pos; if (pos + len <= stream->buf_len) { read1 = len; } else { read1 = stream->buf_len - pos; data2 = stream->buf; read2 = len - read1; } assert(read1 + read2 == len); client->pos += lost + read1 + read2; assert(client->pos <= stream->pos); if (client->pos == stream->pos && stream->eos_inp) eos = 1; assert(eos || lost + read1 + read2 > 0); write_stringz(&c->out, "R"); write_stringz(&c->out, token); if (read1 + read2 > 0) { JsonWriteBinaryState state; json_write_binary_start(&state, &c->out, read1 + read2); json_write_binary_data(&state, data1, read1); json_write_binary_data(&state, data2, read2); json_write_binary_end(&state); write_stream(&c->out, 0); } else { write_stringz(&c->out, "null"); } write_errno(&c->out, 0); json_write_long(&c->out, lost); write_stream(&c->out, 0); json_write_boolean(&c->out, eos); write_stream(&c->out, 0); write_stream(&c->out, MARKER_EOM); } void virtual_stream_create(const char * type, const char * context_id, size_t buf_len, unsigned access, VirtualStreamCallBack * callback, void * callback_args, VirtualStream ** res) { LINK * l; VirtualStream * stream = (VirtualStream *)loc_alloc_zero(sizeof(VirtualStream)); buf_len++; list_init(&stream->clients); strlcpy(stream->type, type, sizeof(stream->type)); stream->magic = STREAM_MAGIC; stream->id = id_cnt++; stream->access = access; stream->callback = callback; stream->callback_args = callback_args; stream->ref_cnt = 1; stream->buf = (char *)loc_alloc(buf_len); stream->buf_len = buf_len; for (l = subscriptions.next; l != &subscriptions; l = l->next) { Subscription * h = all2subscription(l); if (strcmp(type, h->type) == 0) { Trap trap; create_client(stream, h->channel); if (set_trap(&trap)) { send_event_stream_created(&h->channel->out, stream, context_id); clear_trap(&trap); } else { trace(LOG_ALWAYS, "Exception sending stream created event: %d %s", trap.error, errno_to_str(trap.error)); } } } list_add_first(&stream->link_all, &streams); *res = stream; } VirtualStream * virtual_stream_find(char * id) { LINK * l; unsigned n = 0; if (str2id(id, &n)) { for (l = streams.next; l != &streams; l = l->next) { VirtualStream * stream = all2stream(l); if (stream->id == n && !stream->deleted) return stream; } } errno = ERR_INV_CONTEXT; return NULL; } int virtual_stream_add_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int eos) { int err = 0; assert(stream->magic == STREAM_MAGIC); if (stream->eos_inp) err = ERR_EOF; if (!err) { size_t len = (stream->buf_out + stream->buf_len - stream->buf_inp - 1) % stream->buf_len; assert(len < stream->buf_len); if (buf_size < len) len = buf_size; if (stream->buf_inp + len <= stream->buf_len) { memcpy(stream->buf + stream->buf_inp, buf, len); } else { size_t x = stream->buf_len - stream->buf_inp; size_t y = len - x; memcpy(stream->buf + stream->buf_inp, buf, x); memcpy(stream->buf, buf + x, y); } stream->buf_inp = (stream->buf_inp + len) % stream->buf_len; stream->pos += len; *data_size = len; if (eos && buf_size == len) stream->eos_inp = 1; } if (stream->access & VS_ENABLE_REMOTE_READ) { if (!err && (stream->eos_inp || *data_size > 0)) { LINK * l; for (l = stream->clients.next; l != &stream->clients; l = l->next) { StreamClient * client = stream2client(l); while (!list_is_empty(&client->read_requests) && (client->pos < stream->pos || stream->eos_inp)) { ReadRequest * r = client2read_request(client->read_requests.next); list_remove(&r->link_client); send_read_reply(client, r->token, r->size); loc_free(r); } } advance_stream_buffer(stream); } } else if (!stream->data_available_posted) { post_event(notify_data_available, stream); stream->data_available_posted = 1; } errno = err; return err ? -1 : 0; } int virtual_stream_get_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int * eos) { size_t len; assert(stream->magic == STREAM_MAGIC); len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; if (len > buf_size) { len = buf_size; *eos = 0; } else { *eos = stream->eos_inp; } *data_size = len; if (*eos) stream->eos_out = 1; if (stream->buf_out + len <= stream->buf_len) { memcpy(buf, stream->buf + stream->buf_out, len); } else { size_t x = stream->buf_len - stream->buf_out; size_t y = len - x; memcpy(buf, stream->buf + stream->buf_out, x); memcpy(buf + x, stream->buf, y); } if (stream->access & VS_ENABLE_REMOTE_WRITE) { LINK * l; for (l = stream->clients.next; l != &stream->clients; l = l->next) { StreamClient * client = stream2client(l); if (!list_is_empty(&client->write_requests)) { WriteRequest * r = client2write_request(client->write_requests.next); size_t done = 0; int error = 0; if (virtual_stream_add_data(client->stream, r->data + r->offs, r->size - r->offs, &done, r->eos) < 0) error = errno; r->offs += done; if (error || r->offs >= r->size) { delete_write_request(r, error); } while (error && !list_is_empty(&client->write_requests)) { r = client2write_request(client->write_requests.next); delete_write_request(r, ERR_COMMAND_CANCELLED); } } } } if ((stream->access & VS_ENABLE_REMOTE_READ) == 0 && len > 0) { stream->buf_out = (stream->buf_out + len) % stream->buf_len; assert(!*eos || stream->buf_out == stream->buf_inp); if (!stream->space_available_posted) { post_event(notify_space_available, stream); stream->space_available_posted = 1; } } return 0; } int virtual_stream_is_empty(VirtualStream * stream) { assert(stream->magic == STREAM_MAGIC); assert(!stream->deleted); return stream->buf_out == stream->buf_inp; } void virtual_stream_delete(VirtualStream * stream) { assert(stream->magic == STREAM_MAGIC); assert(!stream->deleted); stream->deleted = 1; if (--stream->ref_cnt > 0) return; assert(list_is_empty(&stream->clients)); post_event(delete_stream, stream); } static void command_subscribe(char * token, Channel * c) { char type[256]; int err = 0; LINK * l; json_read_string(&c->inp, type, sizeof(type)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); for (l = subscriptions.next; l != &subscriptions;) { Subscription * h = all2subscription(l); l = l->next; if (h->channel == c && strcmp(type, h->type) == 0) { err = set_errno(ERR_OTHER, "Already subscribed"); break; } } if (err == 0) { Subscription * s = (Subscription *)loc_alloc_zero(sizeof(Subscription)); list_init(&s->link_all); list_add_first(&s->link_all, &subscriptions); strlcpy(s->type, type, sizeof(s->type)); s->channel = c; } write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } static void command_unsubscribe(char * token, Channel * c) { char type[256]; int err = 0; Subscription * s = NULL; LINK * l; json_read_string(&c->inp, type, sizeof(type)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); for (l = subscriptions.next; l != &subscriptions;) { Subscription * h = all2subscription(l); l = l->next; if (h->channel == c && strcmp(type, h->type) == 0) { s = h; break; } } if (s == NULL) err = ERR_INV_CONTEXT; if (err == 0) delete_subscription(s); write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } static void command_read(char * token, Channel * c) { char id[256]; size_t size = 0; StreamClient * client = NULL; int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); size = json_read_ulong(&c->inp); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); client = find_client(id, c); if (client == NULL) err = errno; if (!err && (client->stream->access & VS_ENABLE_REMOTE_READ) == 0) err = ERR_UNSUPPORTED; if (err == 0) { VirtualStream * stream = client->stream; if (client->pos == stream->pos && !stream->eos_inp) { ReadRequest * r = (ReadRequest *)loc_alloc_zero(sizeof(ReadRequest)); list_init(&r->link_client); r->client = client; r->size = size; strlcpy(r->token, token, sizeof(r->token)); list_add_last(&r->link_client, &client->read_requests); } else { assert(list_is_empty(&client->read_requests)); assert(client->channel == c); send_read_reply(client, token, size); advance_stream_buffer(stream); } } else { write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_stringz(&c->out, "null"); write_errno(&c->out, err); json_write_long(&c->out, 0); write_stream(&c->out, 0); json_write_boolean(&c->out, 0); write_stream(&c->out, 0); write_stream(&c->out, MARKER_EOM); } } static void command_write(char * token, Channel * c) { char id[256]; StreamClient * client = NULL; long size = 0; long offs = 0; char * data = NULL; int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); size = json_read_long(&c->inp); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); client = find_client(id, c); if (client == NULL) err = errno; if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; { JsonReadBinaryState state; size_t data_pos = 0; if (!err && !list_is_empty(&client->write_requests)) data = (char *)loc_alloc(size); json_read_binary_start(&state, &c->inp); for (;;) { if (data != NULL) { size_t rd = json_read_binary_data(&state, data + data_pos, size - offs - data_pos); if (rd == 0) break; data_pos += rd; } else { char buf[256]; size_t rd = json_read_binary_data(&state, buf, sizeof(buf)); if (rd == 0) break; if (!err) { size_t done = 0; if (virtual_stream_add_data(client->stream, buf, rd, &done, 0) < 0) err = errno; assert(done <= rd); offs += done; if (!err && done < rd) { data = (char *)loc_alloc(size - offs); memcpy(data, buf + done, rd - done); data_pos = rd - done; } } } } json_read_binary_end(&state); } if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); if (data != NULL) { WriteRequest * r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); list_init(&r->link_client); r->client = client; r->data = data; r->size = size - offs; strlcpy(r->token, token, sizeof(r->token)); list_add_last(&r->link_client, &client->write_requests); } else { write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } } static void command_eos(char * token, Channel * c) { char id[256]; StreamClient * client = NULL; size_t done = 0; WriteRequest * r = NULL; int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); client = find_client(id, c); if (client == NULL) err = errno; if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; if (!err && !list_is_empty(&client->write_requests)) r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); if (!err && r == NULL && virtual_stream_add_data(client->stream, NULL, 0, &done, 1) < 0) err = errno; if (r != NULL) { list_init(&r->link_client); r->client = client; r->eos = 1; strlcpy(r->token, token, sizeof(r->token)); list_add_last(&r->link_client, &client->write_requests); } else { write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } } static void command_connect(char * token, Channel * c) { char id[256]; int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); if (find_client(id, c) == NULL) { VirtualStream * stream = virtual_stream_find(id); if (stream == NULL) err = errno; else create_client(stream, c); } write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } static void command_disconnect(char * token, Channel * c) { char id[256]; StreamClient * client = NULL; int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); client = find_client(id, c); if (client == NULL) err = errno; if (!err) delete_client(client); write_stringz(&c->out, "R"); write_stringz(&c->out, token); write_errno(&c->out, err); write_stream(&c->out, MARKER_EOM); } static void channel_close_listener(Channel * c) { LINK * l = NULL; for (l = clients.next; l != &clients;) { StreamClient * client = all2client(l); l = l->next; if (client->channel == c) { trace(LOG_ALWAYS, "Stream is left connected by client: VS%d", client->stream->id); delete_client(client); } } for (l = subscriptions.next; l != &subscriptions;) { Subscription * h = all2subscription(l); l = l->next; if (h->channel == c) { delete_subscription(h); } } } void ini_streams_service(Protocol * proto) { int i; for (i = 0; i < HANDLE_HASH_SIZE; i++) { list_init(&handle_hash[i]); } add_channel_close_listener(channel_close_listener); add_command_handler(proto, STREAMS, "subscribe", command_subscribe); add_command_handler(proto, STREAMS, "unsubscribe", command_unsubscribe); add_command_handler(proto, STREAMS, "read", command_read); add_command_handler(proto, STREAMS, "write", command_write); add_command_handler(proto, STREAMS, "eos", command_eos); add_command_handler(proto, STREAMS, "connect", command_connect); add_command_handler(proto, STREAMS, "disconnect", command_disconnect); } #endif /* SERVICE_Streams */