Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDidier Brachet2016-10-20 14:35:33 +0000
committerDidier Brachet2016-10-20 14:35:33 +0000
commitf4314ff5242cd3298f7854cb8ae567974532e643 (patch)
tree248d518cce2a431fad1211d70e6c4294c0bdb555 /agent/tcf/framework/channel_lws.c
parent1fcf7ebda310f3866cb8916e6b2b4dedcbc50b62 (diff)
downloadorg.eclipse.tcf.agent-f4314ff5242cd3298f7854cb8ae567974532e643.tar.gz
org.eclipse.tcf.agent-f4314ff5242cd3298f7854cb8ae567974532e643.tar.xz
org.eclipse.tcf.agent-f4314ff5242cd3298f7854cb8ae567974532e643.zip
TCF Agent: Added support for WebSocket channel
Added support for WebSocket channel based on libwebsockets library. This channel allows establishing a WebSocket or secured WebSocket connection between TCF peers.
Diffstat (limited to 'agent/tcf/framework/channel_lws.c')
-rw-r--r--agent/tcf/framework/channel_lws.c1606
1 files changed, 1606 insertions, 0 deletions
diff --git a/agent/tcf/framework/channel_lws.c b/agent/tcf/framework/channel_lws.c
new file mode 100644
index 00000000..eba887a8
--- /dev/null
+++ b/agent/tcf/framework/channel_lws.c
@@ -0,0 +1,1606 @@
+/*******************************************************************************
+ * Copyright (c) 2016 Wind River Systems, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ * You may elect to redistribute this code under either of these licenses.
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * Implements input and output stream over WebSocket (both secured and
+ * insecured).
+ * This is implemented on top of libwebsockets library; for more information
+ * about libwebsockets, look at https://libwebsockets.org/.
+ */
+#if defined(__GNUC__) && !defined(_GNU_SOURCE)
+# define _GNU_SOURCE
+#endif
+#include <tcf/config.h>
+
+#if ENABLE_LibWebSockets
+#include <fcntl.h>
+#include <stddef.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <ctype.h>
+#if defined(__linux__)
+#include <sys/epoll.h>
+#endif
+#include <tcf/framework/mdep-threads.h>
+#include <tcf/framework/mdep-fs.h>
+#include <tcf/framework/mdep-inet.h>
+#include <tcf/framework/tcf.h>
+#include <tcf/framework/channel.h>
+#include <tcf/framework/channel_lws.h>
+#include <tcf/framework/channel_lws_ext.h>
+#include <tcf/framework/myalloc.h>
+#include <tcf/framework/protocol.h>
+#include <tcf/framework/errors.h>
+#include <tcf/framework/events.h>
+#include <tcf/framework/exceptions.h>
+#include <tcf/framework/trace.h>
+#include <tcf/framework/json.h>
+#include <tcf/framework/peer.h>
+#include <tcf/framework/ip_ifc.h>
+#include <tcf/framework/asyncreq.h>
+#include <tcf/framework/inputbuf.h>
+#include <tcf/framework/outputbuf.h>
+#include <tcf/services/discovery.h>
+#include <libwebsockets.h>
+
+#ifdef WIN32
+#ifndef ECONNREFUSED
+#define ECONNREFUSED WSAECONNREFUSED
+#endif
+#endif
+
+#ifndef MSG_MORE
+#define MSG_MORE 0
+#endif
+
+#if defined(__linux__)
+#define ENABLE_Epoll 1
+#else
+#define ENABLE_Epoll 0
+#endif
+
+/* It seems that epoll and poll events macros are identical. This is
+ * tested in the init routine. Let's make the conversion routine a nop.
+ */
+#define POLL_TO_EPOLL_EVENT(events,eevents) eevents = events;
+#define EPOLL_TO_POLL_EVENT(events,eevents) events = eevents;
+
+#define BUF_SIZE OUTPUT_QUEUE_BUF_SIZE
+#define CHANNEL_MAGIC 0x43253234
+#define MAX_IFC 10
+#define MAX_CONTEXT_THREADS 8
+#define MAX_CONN_REQUESTS 16
+
+typedef struct ChannelConnectInfo {
+ LINK link;
+ ChannelConnectCallBack callback;
+ void * callback_args;
+ int is_ssl;
+ int self_signed;
+ int port;
+ char * host;
+ const char * get_url;
+ int error;
+ struct SessionData * data;
+ const char * ca_filepath;
+ const char * certificate;
+ const char * cipher_list;
+ const char * key;
+ unsigned int options;
+} ChannelConnectInfo;
+
+typedef struct ServerCreateInfo {
+ LINK link;
+ int port;
+ int error;
+ const char * ca_filepath;
+ const char * certificate;
+ const char * key;
+ const char * cipher_list;
+ unsigned int options;
+ struct ServerWS * si;
+} ServerCreateInfo;
+
+#define link2cci(A) ((ChannelConnectInfo *)((char *)(A) - offsetof(ChannelConnectInfo, link)))
+#define link2sci(A) ((ServerCreateInfo *)((char *)(A) - offsetof(ServerCreateInfo, link)))
+
+typedef struct ChannelWS {
+ Channel chan; /* Public channel information - must be first */
+ int magic; /* Magic number */
+ int lock_cnt; /* Stream lock count, when > 0 channel cannot be deleted */
+ int read_pending; /* Read request is pending */
+ int read_done_posted;
+ unsigned char * read_buf;
+ ssize_t read_buf_size;
+ int read_done;
+
+ /* Input stream buffer */
+ InputBuf ibuf;
+
+ /* Output stream state */
+ unsigned char * out_bin_block;
+ OutputBuffer * obuf;
+ int out_errno;
+ int out_flush_cnt; /* Number of posted lazy flush events */
+ int out_eom_cnt; /* Number of end-of-message markers in the output buffer */
+ OutputQueue out_queue;
+ int is_ssl;
+ struct {
+ char data [BUF_SIZE + LWS_SEND_BUFFER_PRE_PADDING +
+ LWS_SEND_BUFFER_POST_PADDING];
+ ssize_t len;
+ ssize_t written;
+ int error;
+ } outbuf;
+ struct ChannelInputBuffer * inbuf;
+ struct SessionData * data;
+ int closing;
+} ChannelWS;
+
+typedef struct ChannelInputBuffer ChannelInputBuffer;
+struct ChannelInputBuffer {
+ void * in;
+ ssize_t len;
+ struct ChannelInputBuffer * next;
+ int error;
+ struct SessionData * data;
+};
+
+typedef struct ServerWS {
+ ChannelServer serv;
+ LINK servlink;
+ int is_ssl;
+ int exiting;
+} ServerWS;
+
+typedef struct SessionData {
+ pthread_mutex_t mutex;
+ ChannelWS * c;
+ ServerWS * si; /* server structure (null if client connection) */
+ struct lws * wsi;
+ struct sockaddr * addr_buf; /* Socket remote address */
+ socklen_t addr_len;
+ char ** prop_names;
+ char ** prop_values;
+ unsigned prop_cnt;
+} SessionData;
+
+typedef struct WSIUserData {
+ SessionData * data; /* session data */
+ ChannelConnectInfo * args; /* connection args (client connection only) */
+ void * cb_arg;
+} WSIUserData;
+
+
+#define channel2ws(A) ((ChannelWS *)((char *)(A) - offsetof(ChannelWS, chan)))
+#define inp2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, inp)))
+#define out2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, out)))
+#define server2ws(A) ((ServerWS *)((char *)(A) - offsetof(ServerWS, serv)))
+#define servlink2np(A) ((ServerWS *)((char *)(A) - offsetof(ServerWS, servlink)))
+#define ibuf2ws(A) ((ChannelWS *)((char *)(A) - offsetof(ChannelWS, ibuf)))
+#define obuf2ws(A) ((ChannelWS *)((char *)(A) - offsetof(ChannelWS, out_queue)))
+
+static void lws_channel_read_done(void * x);
+static void handle_channel_msg(void * x);
+static int lws_tcf_callback(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
+static void server_lws_connect_done(void * x);
+static void channel_lws_connect_done(void * x);
+static void lws_shutdown(ChannelWS * c);
+static void done_write_request(void * args);
+static void lws_lock(Channel * channel);
+static void lws_unlock(Channel * channel);
+
+static LINK server_list;
+static pthread_mutex_t lws_list_mutex;
+static LINK client_connect_list = TCF_LIST_INIT(client_connect_list);
+static LINK server_create_list = TCF_LIST_INIT(server_create_list);
+static int pending_connections = 0; /* number of simultaneous pending connections */
+static int dummy_socket = -1; /* dummy IP socket we use for various operations */
+static struct lws_vhost * client_vhost = NULL;
+
+#if ENABLE_Epoll
+static int epoll_fd; /* epoll file descriptor for lws context */
+static uint32_t * epoll_events; /* epoll events for lws context */
+static int dummy_pipe_fds[2];
+#endif
+static int deny_deflate;
+static pthread_mutex_t poll_mutex;
+static struct lws_context *lws_ctx;
+
+
+/* list of supported protocols and callbacks */
+
+static struct lws_protocols protocols[] = {
+ {"tcf", lws_tcf_callback, sizeof(WSIUserData), BUF_SIZE},
+ {NULL, NULL, 0, 0 } /* end */
+};
+
+static const struct lws_extension exts[] = {
+ {
+ "permessage-deflate",
+ lws_extension_callback_pm_deflate,
+ "permessage-deflate; client_max_window_bits"
+ },
+ {
+ "deflate-frame",
+ lws_extension_callback_pm_deflate,
+ "deflate_frame"
+ },
+ {NULL, NULL, NULL /* terminator */ }
+};
+
+#if ENABLE_Epoll
+static void channel_lws_abort_epoll() {
+ char buf = 0;
+ if (write(dummy_pipe_fds[1], &buf, 1));
+}
+#endif
+
+static void lws_channel_event_read (void * args) {
+ ChannelInputBuffer * buf = (ChannelInputBuffer *)args;
+ ChannelWS * c;
+
+ pthread_mutex_lock(&buf->data->mutex);
+ c = buf->data->c;
+ if (buf->data->c) {
+ if (c->inbuf) {
+ ChannelInputBuffer * inbuf;
+ for (inbuf = c->inbuf; inbuf->next != NULL ; inbuf =
+ inbuf->next)
+ ;
+ assert (inbuf->next == NULL);
+ buf->next = NULL;
+ inbuf->next = buf;
+ }
+ else {
+ buf->next = NULL;
+ c->inbuf = buf;
+ }
+ if (buf->len == 0) c->closing = 1;
+ if (c->read_pending) {
+ if (!c->read_done_posted) {
+ c->read_done_posted = 1;
+ post_event(lws_channel_read_done, c);
+ }
+ }
+ if (c->closing) {
+ /* The channel is now closed; post an additional lws_unlock
+ * call to trigger the deletion of the channel. The original
+ * lock was done when the channel was established
+ */
+ post_event((EventCallBack *)lws_unlock, c);
+ }
+ }
+ pthread_mutex_unlock(&buf->data->mutex);
+}
+
+static void lws_add_channel_property(SessionData * data, char * name, char * value) {
+ data->prop_names = (char **)loc_realloc(data->prop_names, (data->prop_cnt + 1) * sizeof(char *));
+ data->prop_values = (char **)loc_realloc(data->prop_values, (data->prop_cnt + 1) * sizeof(char *));
+ data->prop_names[data->prop_cnt] = name;
+ data->prop_values[data->prop_cnt] = value;
+ data->prop_cnt++;
+}
+
+static void lws_parse_http_header(SessionData * data, struct lws *wsi) {
+ int n = 0, len;
+ char * prop_name;
+ char * prop_value;
+
+ /* Do not extract all properties but only the one that we (currently)
+ * consider as useful; that is the URI and URI args. Extracting
+ * all properties may require a lot of memory (especially with cookies).
+ */
+ len = lws_hdr_total_length(wsi, WSI_TOKEN_GET_URI);
+ if (len) {
+ prop_name = (char *)loc_strdup("get_uri");
+ prop_value = (char *)loc_alloc_zero(len + 1);
+ lws_hdr_copy(wsi, prop_value, len + 1, WSI_TOKEN_GET_URI);
+ lws_add_channel_property(data, prop_name, prop_value);
+ }
+ len = lws_hdr_total_length(wsi, WSI_TOKEN_HTTP_URI_ARGS);
+ if (len) {
+ char * buf = (char *)loc_alloc_zero(len + 1);
+ n = 0;
+ while (lws_hdr_copy_fragment(wsi, buf, len + 1,
+ WSI_TOKEN_HTTP_URI_ARGS, n) > 0) {
+ prop_name = (char *)loc_alloc_zero(32);
+ sprintf(prop_name, "uri-args_%d", n);
+ prop_value = loc_strdup(buf);
+ lws_add_channel_property(data, prop_name, prop_value);
+ n++;
+ }
+ loc_free(buf);
+ }
+}
+
+static int lws_tcf_callback(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len)
+{
+ struct lws_pollargs *pa = (struct lws_pollargs *)in;
+ WSIUserData * userdata = (WSIUserData *) user;
+
+ switch (reason) {
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ case LWS_CALLBACK_ESTABLISHED:
+ {
+ assert (userdata != NULL);
+ int is_server = (reason == LWS_CALLBACK_ESTABLISHED);
+
+ /* Allocate session data. Note that the session data will
+ * be freed from the dispatch thread to avoid any race
+ * condition. Any update to the SessionData will be done
+ * while holding the session data mutex.
+ */
+
+ SessionData * data;
+
+ if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1;
+
+ data = (SessionData *)loc_alloc_zero(sizeof (SessionData));
+#if defined(SOCK_MAXADDRLEN)
+ data->addr_len = SOCK_MAXADDRLEN;
+#else
+ data->addr_len = 0x1000;
+#endif
+ pthread_mutex_init(&data->mutex, NULL);
+ data->wsi = wsi;
+ data->addr_buf = (struct sockaddr *)loc_alloc_zero(data->addr_len);
+ if (getpeername(lws_get_socket_fd(wsi),
+ data->addr_buf, &data->addr_len) != 0) {
+ data->addr_len = 0;
+ }
+
+ userdata->data = data;
+ if (is_server) {
+ data->si = *(ServerWS **) lws_protocol_vh_priv_get(lws_vhost_get(wsi), protocols);
+#if defined(LWS_OPENSSL_SUPPORT)
+ X509 * certificate = NULL;
+ if (in) certificate = SSL_get_peer_certificate((const SSL *)in);
+ if (certificate) {
+ char * name = X509_NAME_oneline(X509_get_subject_name(certificate), 0, 0);
+ if (name) lws_add_channel_property(data, loc_strdup("PeerCertName"), loc_strdup(name));
+ free(name);
+ X509_free(certificate);
+ }
+#endif
+ lws_parse_http_header(data, wsi);
+
+ post_event(server_lws_connect_done, data);
+ }
+ else {
+ ChannelConnectInfo * args = userdata->args;
+ args->data = data;
+ userdata->args = NULL;
+ post_event(channel_lws_connect_done, args);
+ }
+ break;
+ }
+ case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+ {
+ assert (userdata != NULL);
+ ChannelConnectInfo * args = userdata->args;
+
+ /* connection error hook is called multiple times; make
+ * sure we do not call the channel connection callback
+ * multiple times; use the error field to detect this.
+ */
+ if (args) {
+ args->error = ECONNREFUSED;
+ userdata->args = NULL;
+ (void) LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len);
+ post_event(channel_lws_connect_done, args);
+ }
+ }
+ break;
+
+ case LWS_CALLBACK_CLOSED:
+ {
+ assert (userdata != NULL);
+ SessionData * data = userdata->data;
+
+ if (data != NULL) {
+ ChannelInputBuffer * buf = (ChannelInputBuffer *)loc_alloc_zero(sizeof(ChannelInputBuffer));
+
+ buf->data = data;
+ pthread_mutex_lock(&buf->data->mutex);
+ assert (buf->data->wsi != NULL);
+ buf->data->wsi = NULL;
+ /* The channel can now be closed. The channel was original locked when
+ * it was created to make sure it can be safely referenced in the service
+ * thread; at this point we know it will no longer be used; we can
+ * unlock it.
+ */
+ if (data->c) {
+ if (data->c->outbuf.len > 0 && data->c->outbuf.written == 0) {
+ data->c->outbuf.written = -1;
+ data->c->outbuf.len = 0;
+ data->c->outbuf.error = ECONNRESET;
+ post_event(done_write_request, data->c);
+ }
+ }
+ post_event(lws_channel_event_read, buf);
+ pthread_mutex_unlock(&data->mutex);
+
+ /* At this point, this session data will no longer be used
+ * again by the LWS context; we can safely delete it from
+ * the dispatch thread.
+ */
+
+ }
+ (void)LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len);
+ }
+ break;
+
+ case LWS_CALLBACK_RECEIVE:
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ assert (userdata != NULL);
+
+ if (len > 0) {
+ ChannelInputBuffer * buf = (ChannelInputBuffer *)loc_alloc_zero(sizeof(ChannelInputBuffer));
+ SessionData * data = userdata->data;
+ buf->in = loc_alloc_zero(len);
+ buf->len = len;
+ buf->data = data;
+ memcpy(buf->in, in, len);
+ post_event(lws_channel_event_read, buf);
+ }
+ break;
+
+ case LWS_CALLBACK_SERVER_WRITEABLE:
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ {
+ assert (userdata != NULL);
+ SessionData * data = userdata->data;
+ int n;
+ pthread_mutex_lock(&data->mutex);
+ if (data->c->outbuf.written != 0 || data->c->outbuf.len == 0) {
+ pthread_mutex_unlock(&data->mutex);
+ return 0;
+ }
+ if (data->c->outbuf.len == -1) {
+ /* close connection */
+ pthread_mutex_unlock(&data->mutex);
+ return 1;
+ }
+ n = lws_write(wsi, (unsigned char *)data->c->outbuf.data + LWS_SEND_BUFFER_PRE_PADDING, data->c->outbuf.len, LWS_WRITE_BINARY);
+ if (n < 0) {
+ data->c->outbuf.written = -1;
+ data->c->outbuf.error = errno;
+ } else {
+ data->c->outbuf.written = n;
+ data->c->outbuf.error = 0;
+ }
+ data->c->outbuf.len = 0;
+ pthread_mutex_unlock(&data->mutex);
+ post_event(done_write_request, data->c);
+ }
+ break;
+
+ case LWS_CALLBACK_WSI_DESTROY:
+ if (lws_vhost_get(wsi) == client_vhost) {
+ loc_free(userdata);
+ }
+ break;
+
+ case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
+ if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1;
+ if ((strcmp((const char *)in, "deflate-stream") == 0) && deny_deflate) {
+ lwsl_notice("denied deflate-stream extension\n");
+ return 1;
+ }
+ if ((strcmp((const char *)in, "x-webkit-deflate-frame") == 0)) return 1;
+ if ((strcmp((const char *)in, "deflate-frame") == 0)) return 1;
+ break;
+
+#if defined(LWS_OPENSSL_SUPPORT)
+ case LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION:
+ {
+ int preverify_ok = len;
+ X509_STORE_CTX *ctx = (X509_STORE_CTX *)user;
+ char buf[256];
+ int err, depth;
+
+ if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1;
+
+ err = X509_STORE_CTX_get_error(ctx);
+
+ if (!preverify_ok) {
+ X509 * err_cert = X509_STORE_CTX_get_current_cert(ctx);
+ X509_NAME_oneline(X509_get_subject_name(err_cert), buf,
+ sizeof(buf));
+ depth = X509_STORE_CTX_get_error_depth(ctx);
+ trace( LOG_PROTOCOL, "Client certificate verify error:num=%d:%s:depth=%d:%s", err,
+ X509_verify_cert_error_string(err), depth, buf);
+ }
+
+ /*
+ * At this point, err contains the last verification error. We can use
+ * it for something special
+ */
+ if (!preverify_ok
+ && (err == X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT)) {
+ X509_NAME_oneline(X509_get_issuer_name(ctx->current_cert), buf,
+ sizeof(buf));
+ trace( LOG_PROTOCOL, "Client certificate issuer= %s", buf);
+ }
+
+ if (!preverify_ok) return 1;
+ break;
+ }
+#endif
+
+ case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS:
+ (void)LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len);
+ break;
+
+ case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ (void)LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len);
+ break;
+
+ /*
+ * callbacks for managing the external poll() array appear in
+ * protocol 0 callback
+ */
+
+ case LWS_CALLBACK_LOCK_POLL:
+ /*
+ * lock mutex to protect pollfd state
+ * called before any other POLL related callback
+ * if protecting wsi lifecycle change, len == 1
+ */
+ if (len) pthread_mutex_lock(&poll_mutex);
+ break;
+
+ case LWS_CALLBACK_UNLOCK_POLL:
+ /*
+ * unlock mutex to protect pollfd state when
+ * called after any other POLL related callback
+ * if protecting wsi lifecycle change, len == 1
+ */
+ if (len) pthread_mutex_unlock(&poll_mutex);
+ break;
+
+#if ENABLE_Epoll
+ case LWS_CALLBACK_ADD_POLL_FD:
+ {
+ struct epoll_event event;
+ POLL_TO_EPOLL_EVENT(pa->events, epoll_events[pa->fd]);
+ memset(&event, 0, sizeof(struct epoll_event));
+ event.data.fd = pa->fd;
+ event.events = epoll_events[pa->fd];
+ if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, pa->fd, &event) != 0) return 1;
+ break;
+ }
+
+ case LWS_CALLBACK_DEL_POLL_FD:
+ if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pa->fd, NULL) != 0) return 1;
+ break;
+
+ case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
+ {
+ struct epoll_event event;
+ memset(&event, 0, sizeof(struct epoll_event));
+ POLL_TO_EPOLL_EVENT(pa->events, epoll_events[pa->fd]);
+ event.data.fd = pa->fd;
+ event.events = epoll_events[pa->fd];
+ if (epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pa->fd, &event) != 0) return 1;
+ break;
+ }
+#endif
+
+ case LWS_CALLBACK_GET_THREAD_ID:
+ {
+ /*
+ * For lws multi-thread support, we need to return a different
+ * thread ID for each thread.
+ *
+ * On Windows, thread ID does not fit on a 32-bit, let's use
+ * thread index instead of thread ID. Note that we must never
+ * return 0 since this will disable locking.
+ */
+
+ if (is_dispatch_thread()) return 1;
+ else return 2;
+ }
+ break;
+ default:
+ if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1;
+ break;
+ }
+ return 0;
+}
+
+static void delete_channel(ChannelWS * c) {
+ unsigned ix;
+ trace(LOG_PROTOCOL, "Deleting channel %#lx", c);
+ assert(c->lock_cnt == 0);
+ assert(c->out_flush_cnt == 0);
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->read_pending == 0);
+ assert(c->ibuf.handling_msg != HandleMsgTriggered);
+ channel_clear_broadcast_group(&c->chan);
+ list_remove(&c->chan.chanlink);
+ if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
+ shutdown_set_stopped(&channel_shutdown);
+ c->magic = 0;
+ output_queue_clear(&c->out_queue);
+ output_queue_free_obuf(c->obuf);
+ loc_free(c->ibuf.buf);
+ loc_free(c->chan.peer_name);
+ loc_free(c->data->addr_buf);
+ if (c->data->prop_cnt) {
+ for (ix = 0; ix < c->data->prop_cnt; ix++) {
+ loc_free(c->data->prop_names[ix]);
+ loc_free(c->data->prop_values[ix]);
+ }
+ loc_free(c->data->prop_names);
+ loc_free(c->data->prop_values);
+ }
+ loc_free(c->data);
+ loc_free(c);
+}
+
+static void lws_lock(Channel * channel) {
+ ChannelWS * c = channel2ws(channel);
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ c->lock_cnt++;
+}
+
+static void lws_unlock(Channel * channel) {
+ ChannelWS * c = channel2ws(channel);
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->lock_cnt > 0);
+ c->lock_cnt--;
+ if (c->lock_cnt == 0) {
+ assert(!c->read_pending);
+ delete_channel(c);
+ }
+}
+static int lws_is_closed(Channel * channel) {
+ ChannelWS * c = channel2ws(channel);
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->lock_cnt > 0);
+ return c->chan.state == ChannelStateDisconnected;
+}
+
+static void done_write_request(void * args) {
+ ChannelWS * c = (ChannelWS *)args;
+ int size = 0;
+ int error = 0;
+
+ assert (c->out_errno == 0);
+ pthread_mutex_lock(&c->data->mutex);
+ if (c->outbuf.written < 0) error = c->outbuf.error;
+ size = c->outbuf.written;
+ c->outbuf.written = 0;
+ c->outbuf.len = 0;
+ pthread_mutex_unlock(&c->data->mutex);
+ output_queue_done(&c->out_queue, error, size);
+ if (error) c->out_errno = error;
+ if (output_queue_is_empty(&c->out_queue) &&
+ c->chan.state == ChannelStateDisconnected) lws_shutdown(c);
+ lws_unlock(&c->chan);
+}
+
+static void post_write_request(OutputBuffer * bf) {
+ ChannelWS * c = obuf2ws(bf->queue);
+ int posted = 0;
+
+ pthread_mutex_lock(&c->data->mutex);
+
+ /* We should not enter here before previous send has been handled */
+ assert (c->outbuf.len == 0);
+
+ if (!c->closing && c->data->wsi) {
+ int res = 0;
+ c->outbuf.written = 0;
+ c->outbuf.len = bf->buf_len - bf->buf_pos;
+ if (c->outbuf.len > BUF_SIZE) c->outbuf.len = BUF_SIZE;
+ memcpy((unsigned char*)c->outbuf.data + LWS_SEND_BUFFER_PRE_PADDING, bf->buf + bf->buf_pos, c->outbuf.len);
+ res = lws_callback_on_writable(c->data->wsi);
+ if (res >= 0) {
+ posted = 1;
+ lws_lock(&c->chan);
+ }
+ }
+
+ if (!posted) {
+ c->outbuf.written = -1;
+ c->outbuf.error = ECONNRESET;
+ post_event(done_write_request, c);
+ lws_lock(&c->chan);
+ }
+ pthread_mutex_unlock(&c->data->mutex);
+}
+
+static void lws_flush_with_flags(ChannelWS * c, int flags) {
+ unsigned char * p = c->obuf->buf;
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->chan.out.end == p + sizeof(c->obuf->buf));
+ assert(c->out_bin_block == NULL);
+ assert(c->chan.out.cur >= p);
+ assert(c->chan.out.cur <= p + sizeof(c->obuf->buf));
+ if (c->chan.out.cur == p) return;
+ if (c->chan.state != ChannelStateDisconnected && c->out_errno == 0) {
+ c->obuf->buf_len = c->chan.out.cur - p;
+ c->out_queue.post_io_request = post_write_request;
+ trace(LOG_PROTOCOL, "Outbuf add size:%d",c->obuf->buf_len);
+
+ output_queue_add_obuf(&c->out_queue, c->obuf);
+ c->obuf = output_queue_alloc_obuf();
+ c->chan.out.end = c->obuf->buf + sizeof(c->obuf->buf);
+ }
+ c->chan.out.cur = c->obuf->buf;
+ c->out_eom_cnt = 0;
+}
+
+static void lws_flush_event(void * x) {
+ ChannelWS * c = (ChannelWS *)x;
+ assert(c->magic == CHANNEL_MAGIC);
+ if (--c->out_flush_cnt == 0) {
+ int congestion_level = c->chan.congestion_level;
+ if (congestion_level > 0) usleep(congestion_level * 2500);
+ lws_flush_with_flags(c, 0);
+ lws_unlock(&c->chan);
+ }
+ else if (c->out_eom_cnt > 3) {
+ lws_flush_with_flags(c, 0);
+ }
+}
+
+static void lws_bin_block_start(ChannelWS * c) {
+ *c->chan.out.cur++ = ESC;
+ *c->chan.out.cur++ = 3;
+#if BUF_SIZE > 0x4000
+ *c->chan.out.cur++ = 0;
+#endif
+ *c->chan.out.cur++ = 0;
+ *c->chan.out.cur++ = 0;
+ c->out_bin_block = c->chan.out.cur;
+}
+
+static void lws_bin_block_end(ChannelWS * c) {
+ size_t len = c->chan.out.cur - c->out_bin_block;
+ if (len == 0) {
+#if BUF_SIZE > 0x4000
+ c->chan.out.cur -= 5;
+#else
+ c->chan.out.cur -= 4;
+#endif
+ }
+ else {
+#if BUF_SIZE > 0x4000
+ *(c->out_bin_block - 3) = (len & 0x7fu) | 0x80u;
+ *(c->out_bin_block - 2) = ((len >> 7) & 0x7fu) | 0x80u;
+ *(c->out_bin_block - 1) = (unsigned char)(len >> 14);
+#else
+ *(c->out_bin_block - 2) = (len & 0x7fu) | 0x80u;
+ *(c->out_bin_block - 1) = (unsigned char)(len >> 7);
+#endif
+ }
+ c->out_bin_block = NULL;
+}
+
+static void lws_write_stream(OutputStream * out, int byte) {
+ ChannelWS * c = channel2ws(out2channel(out));
+ assert(c->magic == CHANNEL_MAGIC);
+ if (!c->chan.out.supports_zero_copy || c->chan.out.cur >= c->chan.out.end - 32 || byte < 0) {
+ if (c->out_bin_block != NULL) lws_bin_block_end(c);
+ if (c->chan.out.cur == c->chan.out.end) lws_flush_with_flags(c, MSG_MORE);
+ if (byte < 0 || byte == ESC) {
+ char esc = 0;
+ *c->chan.out.cur++ = ESC;
+ if (byte == ESC) esc = 0;
+ else if (byte == MARKER_EOM) esc = 1;
+ else if (byte == MARKER_EOS) esc = 2;
+ else assert(0);
+ if (c->chan.out.cur == c->chan.out.end) lws_flush_with_flags(c, MSG_MORE);
+ *c->chan.out.cur++ = esc;
+ if (byte == MARKER_EOM) {
+ c->out_eom_cnt++;
+ if (c->out_flush_cnt < 2) {
+ if (c->out_flush_cnt++ == 0) lws_lock(&c->chan);
+ /*post_event_with_delay(lws_flush_event, c, 0);*/
+ post_event(lws_flush_event, c);
+ }
+ }
+ return;
+ }
+ }
+ else if (c->out_bin_block == NULL) {
+ lws_bin_block_start(c);
+ }
+ *c->chan.out.cur++ = (char)byte;
+}
+
+static void lws_write_block_stream(OutputStream * out, const char * bytes, size_t size) {
+ unsigned char * src = (unsigned char *)bytes;
+ ChannelWS * c = channel2ws(out2channel(out));
+ while (size > 0) {
+ size_t n = out->end - out->cur;
+ if (n > size) n = size;
+ if (n == 0) {
+ lws_write_stream(out, *src++);
+ size--;
+ }
+ else if (c->out_bin_block) {
+ memcpy(out->cur, src, n);
+ out->cur += n;
+ size -= n;
+ src += n;
+ }
+ else if (*src != ESC) {
+ unsigned char * dst = out->cur;
+ unsigned char * end = dst + n;
+ do {
+ unsigned char ch = *src;
+ if (ch == ESC) break;
+ *dst++ = ch;
+ src++;
+ }
+ while (dst < end);
+ size -= dst - out->cur;
+ out->cur = dst;
+ }
+ else {
+ lws_write_stream(out, *src++);
+ size--;
+ }
+ }
+}
+
+static ssize_t lws_splice_block_stream(OutputStream * out, int fd, size_t size, int64_t * offset) {
+ assert(is_dispatch_thread());
+ if (size == 0) return 0;
+ {
+ ssize_t rd;
+ char buffer[BUF_SIZE];
+ if (size > BUF_SIZE) size = BUF_SIZE;
+ if (offset != NULL) {
+ rd = pread(fd, buffer, size, (off_t)*offset);
+ if (rd > 0) *offset += rd;
+ }
+ else {
+ rd = read(fd, buffer, size);
+ }
+ if (rd > 0) lws_write_block_stream(out, buffer, rd);
+ return rd;
+ }
+}
+
+static void lws_post_read(InputBuf * ibuf, unsigned char * buf, size_t size) {
+ ChannelWS * c = ibuf2ws(ibuf);
+ if (c->read_pending) return;
+ c->read_pending = 1;
+ c->read_buf = buf;
+ c->read_buf_size = size;
+ if (c->inbuf) {
+ if (!c->read_done_posted) {
+ c->read_done_posted = 1;
+ post_event(lws_channel_read_done, c);
+ }
+ }
+ else if (c->closing) {
+ if (!c->read_done_posted) {
+ c->read_done_posted = 1;
+ post_event(lws_channel_read_done, c);
+ }
+ }
+}
+
+static void lws_wait_read(InputBuf * ibuf) {
+ ChannelWS * c = ibuf2ws(ibuf);
+
+ /* Wait for read to complete */
+ assert(c->lock_cnt > 0);
+ assert(c->read_pending != 0);
+ if (c->read_done_posted) cancel_event(lws_channel_read_done, c, 1);
+ lws_channel_read_done(c);
+}
+
+static int lws_read_stream(InputStream * inp) {
+ Channel * channel = inp2channel(inp);
+ ChannelWS * c = channel2ws(channel);
+
+ assert(c->lock_cnt > 0);
+ if (inp->cur < inp->end) return *inp->cur++;
+ return ibuf_get_more(&c->ibuf, 0);
+}
+
+static int lws_peek_stream(InputStream * inp) {
+ Channel * channel = inp2channel(inp);
+ ChannelWS * c = channel2ws(channel);
+
+ assert(c->lock_cnt > 0);
+ if (inp->cur < inp->end) return *inp->cur;
+ return ibuf_get_more(&c->ibuf, 1);
+}
+
+static void lws_shutdown(ChannelWS * c) {
+ pthread_mutex_lock(&c->data->mutex);
+ if (c->data->wsi) {
+ c->outbuf.len = -1;
+ /* Post a write request that will trigger a close of the channel */
+ lws_callback_on_writable(c->data->wsi);
+ }
+ pthread_mutex_unlock(&c->data->mutex);
+
+}
+
+static void send_eof_and_close(Channel * channel, int err) {
+ ChannelWS * c = channel2ws(channel);
+
+ assert(c->magic == CHANNEL_MAGIC);
+ if (channel->state == ChannelStateDisconnected) return;
+ ibuf_flush(&c->ibuf);
+ if (c->ibuf.handling_msg == HandleMsgTriggered) {
+ /* Cancel pending message handling */
+ cancel_event(handle_channel_msg, c, 0);
+ c->ibuf.handling_msg = HandleMsgIdle;
+ }
+ write_stream(&c->chan.out, MARKER_EOS);
+ write_errno(&c->chan.out, err);
+ write_stream(&c->chan.out, MARKER_EOM);
+ lws_flush_with_flags(c, 0);
+ if (output_queue_is_empty(&c->out_queue)) lws_shutdown(c);
+ c->chan.state = ChannelStateDisconnected;
+ lws_post_read(&c->ibuf, c->ibuf.buf, c->ibuf.buf_size);
+ notify_channel_closed(channel);
+ if (channel->disconnected) {
+ channel->disconnected(channel);
+ }
+ else {
+ trace(LOG_PROTOCOL, "channel %#lx disconnected", c);
+ if (channel->protocol != NULL) protocol_release(channel->protocol);
+ }
+ channel->protocol = NULL;
+}
+
+static void handle_channel_msg(void * x) {
+ Trap trap;
+ ChannelWS * c = (ChannelWS *)x;
+ int has_msg;
+
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->ibuf.handling_msg == HandleMsgTriggered);
+ assert(c->ibuf.message_count);
+
+ has_msg = ibuf_start_message(&c->ibuf);
+ if (has_msg <= 0) {
+ if (has_msg < 0 && c->chan.state != ChannelStateDisconnected) {
+ trace(LOG_PROTOCOL, "Socket is shutdown by remote peer, channel %#lx %s", c, c->chan.peer_name);
+ channel_close(&c->chan);
+ }
+ }
+ else if (set_trap(&trap)) {
+ if (c->chan.receive) {
+ c->chan.receive(&c->chan);
+ }
+ else {
+ handle_protocol_message(&c->chan);
+ assert(c->out_bin_block == NULL);
+ }
+ clear_trap(&trap);
+ }
+ else {
+ trace(LOG_ALWAYS, "Exception in message handler: %s", errno_to_str(trap.error));
+ send_eof_and_close(&c->chan, trap.error);
+ }
+}
+
+static void channel_check_pending(Channel * channel) {
+ ChannelWS * c = channel2ws(channel);
+
+ assert(is_dispatch_thread());
+ if (c->ibuf.handling_msg == HandleMsgIdle && c->ibuf.message_count) {
+ post_event(handle_channel_msg, c);
+ c->ibuf.handling_msg = HandleMsgTriggered;
+ }
+}
+
+static void lws_trigger_message(InputBuf * ibuf) {
+ ChannelWS * c = ibuf2ws(ibuf);
+
+ assert(is_dispatch_thread());
+ assert(c->ibuf.message_count > 0);
+ if (c->ibuf.handling_msg == HandleMsgIdle) {
+ post_event(handle_channel_msg, c);
+ c->ibuf.handling_msg = HandleMsgTriggered;
+ }
+}
+
+static int channel_get_message_count(Channel * channel) {
+ ChannelWS * c = channel2ws(channel);
+ assert(is_dispatch_thread());
+ if (c->ibuf.handling_msg != HandleMsgTriggered) return 0;
+ return c->ibuf.message_count;
+}
+
+static void lws_channel_read_done(void * x) {
+ ChannelWS * c = (ChannelWS *)x;
+ ssize_t total_length = 0;
+ ssize_t read_length = 0;
+
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->read_pending != 0);
+ assert(c->lock_cnt > 0);
+
+
+ c->read_pending = 0;
+ c->read_done_posted = 0;
+ /* some data is available retrieve it */
+ {
+ total_length = c->inbuf ? c->inbuf->len : 0;
+
+ if (c->inbuf && c->inbuf->error) {
+ if (c->chan.state != ChannelStateDisconnected) {
+ trace(LOG_ALWAYS, "Can't read from Web Socket: %s", errno_to_str(c->inbuf->error));
+ }
+ total_length = 0; /* Treat error as EOF */
+ }
+ if (total_length > 0) {
+ read_length = c->inbuf->len > c->read_buf_size ? c->read_buf_size : c->inbuf->len;
+ memcpy(c->read_buf, c->inbuf->in, read_length);
+ c->inbuf->len -= read_length;
+ if (c->inbuf->len != 0) {
+ memmove((unsigned char *)c->inbuf->in, (unsigned char *)c->inbuf->in + read_length, c->inbuf->len);
+ }
+ }
+ else {
+ /* In case of error, free all received buffers */
+ while (c->inbuf) {
+ ChannelInputBuffer * old_buf = c->inbuf;
+ c->inbuf = old_buf->next;
+ loc_free(old_buf->in);
+ loc_free(old_buf);
+ }
+ }
+
+ if (c->inbuf && c->inbuf->len == 0) {
+ ChannelInputBuffer * old_buf = c->inbuf;
+ c->inbuf = old_buf->next;
+ loc_free(old_buf->in);
+ loc_free(old_buf);
+ }
+ }
+ if (c->chan.state != ChannelStateDisconnected) {
+ ibuf_read_done(&c->ibuf, read_length);
+ }
+ else if (total_length > 0) {
+ lws_post_read(&c->ibuf, c->ibuf.buf, c->ibuf.buf_size);
+ }
+ else {
+ lws_unlock(&c->chan);
+ }
+}
+
+static void start_channel(Channel * channel) {
+ ChannelWS * c = channel2ws(channel);
+
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ notify_channel_created(&c->chan);
+ if (c->chan.connecting) {
+ c->chan.connecting(&c->chan);
+ }
+ else {
+ trace(LOG_PROTOCOL, "channel server connecting");
+ send_hello_message(&c->chan);
+ }
+ ibuf_trigger_read(&c->ibuf);
+}
+
+static ChannelWS * create_channel(int is_ssl, int server) {
+ ChannelWS * c;
+
+ c = (ChannelWS *)loc_alloc_zero(sizeof *c);
+ c->magic = CHANNEL_MAGIC;
+ c->is_ssl = is_ssl;
+ c->chan.inp.read = lws_read_stream;
+ c->chan.inp.peek = lws_peek_stream;
+ c->obuf = output_queue_alloc_obuf();
+ c->chan.out.cur = c->obuf->buf;
+ c->chan.out.end = c->obuf->buf + sizeof(c->obuf->buf);
+ c->chan.out.write = lws_write_stream;
+ c->chan.out.write_block = lws_write_block_stream;
+ c->chan.out.splice_block = lws_splice_block_stream;
+ list_add_last(&c->chan.chanlink, &channel_root);
+ shutdown_set_normal(&channel_shutdown);
+ c->chan.state = ChannelStateStartWait;
+ c->chan.incoming = server;
+ c->chan.start_comm = start_channel;
+ c->chan.check_pending = channel_check_pending;
+ c->chan.message_count = channel_get_message_count;
+ c->chan.lock = lws_lock;
+ c->chan.unlock = lws_unlock;
+ c->chan.is_closed = lws_is_closed;
+ c->chan.close = send_eof_and_close;
+ ibuf_init(&c->ibuf, &c->chan.inp);
+ c->ibuf.post_read = lws_post_read;
+ c->ibuf.wait_read = lws_wait_read;
+ c->ibuf.trigger_message = lws_trigger_message;
+ c->lock_cnt = 1;
+ output_queue_ini(&c->out_queue);
+ return c;
+}
+
+static void refresh_peer_server(int sock, PeerServer * ps) {
+ unsigned i;
+ const char * transport = peer_server_getprop(ps, "TransportName", NULL);
+ assert(transport != NULL);
+ const char *str_port = peer_server_getprop(ps, "Port", NULL);
+
+ int ifcind;
+ struct in_addr src_addr;
+ ip_ifc_info ifclist[MAX_IFC];
+
+ /* For now, we do not support binding WebSocket to a specific interface;
+ * let's return all interfaces.
+ */
+ ifcind = build_ifclist(sock, MAX_IFC, ifclist);
+ while (ifcind-- > 0) {
+ char str_host[64];
+ char str_id[64];
+ PeerServer * ps2;
+ src_addr.s_addr = ifclist[ifcind].addr;
+ ps2 = peer_server_alloc();
+ ps2->flags = ps->flags | PS_FLAG_LOCAL | PS_FLAG_DISCOVERABLE;
+ for (i = 0; i < ps->ind; i++) {
+ peer_server_addprop(ps2, loc_strdup(ps->list[i].name),
+ loc_strdup(ps->list[i].value));
+ }
+ inet_ntop(AF_INET, &src_addr, str_host, sizeof(str_host));
+ snprintf(str_id, sizeof(str_id), "%s:%s:%s", transport, str_host, str_port);
+ peer_server_addprop(ps2, loc_strdup("ID"), loc_strdup(str_id));
+ peer_server_addprop(ps2, loc_strdup("Host"), loc_strdup(str_host));
+ peer_server_addprop(ps2, loc_strdup("Port"), loc_strdup(str_port));
+ peer_server_add(ps2, PEER_DATA_RETENTION_PERIOD * 2);
+ }
+}
+
+static void refresh_all_peer_servers(void * x) {
+ LINK * l = server_list.next;
+ while (l != &server_list) {
+ ServerWS * si = servlink2np(l);
+ refresh_peer_server(dummy_socket, si->serv.ps);
+ l = l->next;
+ }
+ post_event_with_delay(refresh_all_peer_servers, NULL, PEER_DATA_REFRESH_PERIOD * 1000000);
+}
+
+static void set_peer_addr(ChannelWS * c, struct sockaddr * addr, int addr_len) {
+ char nbuf[128];
+ /* Create a human readable channel name that uniquely identifies remote peer */
+ char name[128];
+ if (addr_len == 0) return;
+ assert(addr->sa_family == AF_INET);
+ snprintf(name, sizeof(name), "%s:%s:%d",
+ c->is_ssl ? "WSS" : "WS",
+ inet_ntop(addr->sa_family,
+ &((struct sockaddr_in *)addr)->sin_addr,
+ nbuf, sizeof(nbuf)),
+ ntohs(((struct sockaddr_in *)addr)->sin_port));
+ c->chan.peer_name = loc_strdup(name);
+}
+
+
+static void lws_server_exit(void * x) {
+ ServerWS * s = (ServerWS *) x;
+ if (s->exiting) return;
+ s->exiting = 1;
+ list_remove(&s->serv.servlink);
+ if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
+ shutdown_set_stopped(&channel_shutdown);
+ list_remove(&s->servlink);
+ peer_server_free(s->serv.ps);
+}
+
+static void server_close(ChannelServer * serv) {
+ /* ServerWS * s = server2ws(serv);*/
+
+ assert(is_dispatch_thread());
+ /* Closing a running server is not supported for now. This is
+ * because there is no way (AFAIK) to close a virtual host in
+ * libwebsockets library.*/
+ /* lws_server_exit(s); */
+}
+
+/* following _wt_ functions are called from a worker thread so caution is required
+ * to keep its operations thread safe
+ */
+
+static ChannelServer * channel_server_create(PeerServer * ps) {
+ ServerWS * si = (ServerWS *)loc_alloc_zero(sizeof *si);
+ si->serv.close = server_close;
+ si->serv.ps = ps;
+ if (server_list.next == NULL) {
+ list_init(&server_list);
+ post_event_with_delay(refresh_all_peer_servers, NULL, PEER_DATA_REFRESH_PERIOD * 1000000);
+ }
+ list_add_last(&si->serv.servlink, &channel_server_root);
+ shutdown_set_normal(&channel_shutdown);
+ list_add_last(&si->servlink, &server_list);
+ refresh_peer_server(dummy_socket, ps);
+ return &si->serv;
+}
+
+/* LWS service thread */
+
+static void * lws_service_thread(void * x)
+{
+ struct lws_context_creation_info context_creation_info;
+ int vhost_created = 0;
+
+ memset(&context_creation_info, 0, sizeof context_creation_info);
+ context_creation_info.gid = -1;
+ context_creation_info.uid = -1;
+ context_creation_info.max_http_header_pool = MAX_CONN_REQUESTS;
+ context_creation_info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT|LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
+ context_creation_info.server_string = "tcf";
+
+ lws_ctx = lws_create_context(&context_creation_info);
+ if (lws_ctx == NULL) {
+ trace (LOG_ALWAYS, "Error creating libwebsockets client context: %s", strerror(errno));
+ return NULL;
+ }
+
+ while (1) {
+ if (vhost_created) {
+#if ENABLE_Epoll
+ int i, n;
+ struct epoll_event events[16];
+ n = epoll_wait (epoll_fd, events, 16, 1000);
+ if (n == 0) {
+ lws_service_fd(lws_ctx, NULL);
+ }
+ else {
+ for (i = 0; i < n; i++) {
+ struct lws_pollfd pollfd;
+ if (events[i].data.fd == dummy_pipe_fds[0]) {
+ char buf;
+ /* consume events; it will be handled at the end of the loop */
+ if (read(events[i].data.fd, &buf, 1));
+ continue;
+ }
+ pollfd.fd = events[i].data.fd;
+ EPOLL_TO_POLL_EVENT(pollfd.events, epoll_events[events[i].data.fd]);
+ EPOLL_TO_POLL_EVENT(pollfd.revents, events[i].events);
+ if (lws_service_fd(lws_ctx, &pollfd) < 0) continue;
+ }
+ }
+#else
+ lws_service(lws_ctx, 1000);
+#endif
+ }
+ else {
+ usleep (10000);
+ }
+ pthread_mutex_lock(&lws_list_mutex);
+
+ /* The libwebsockets library only allows up to MAX_CONN_REQUESTS simultaneous
+ * connection requests (this is a parameter provided when creating the context;
+ * exceeding this number of pending requests can leads to crashes in the
+ * libwebsockets library.
+ */
+
+ while (!list_is_empty(&server_create_list)) {
+ struct lws_context_creation_info vhost_creation_info;
+ ServerCreateInfo * args = link2sci(server_create_list.next);
+ struct lws_vhost *lws_vh;
+ ServerWS ** si;
+ memset(&vhost_creation_info, 0, sizeof vhost_creation_info);
+ vhost_creation_info.port = args->port;
+ vhost_creation_info.protocols = protocols;
+ vhost_creation_info.options = args->options;
+ vhost_creation_info.ssl_cipher_list = "AES128-SHA256";
+
+ /* Currently, the certificate is shared for all connections;
+ * this is specified for the first created connection and used
+ * for subsequent ones. This would need some rework to either
+ * make this a global property of the agent (instead of a per
+ * connection) making this more natural or we should fix the
+ * libwebsockets code to move this info to the connection info level
+ * instead of the context/vhost level.
+ */
+ vhost_creation_info.ssl_cert_filepath = args->certificate;
+ vhost_creation_info.ssl_private_key_filepath = args->key;
+ vhost_creation_info.ssl_ca_filepath = args->ca_filepath;
+ vhost_creation_info.ssl_cipher_list = args->cipher_list;
+
+ list_remove(&args->link);
+ pthread_mutex_unlock(&lws_list_mutex);
+ if ((lws_vh = lws_create_vhost(lws_ctx, &vhost_creation_info)) == NULL) {
+ trace (LOG_ALWAYS, "Error creating libwebsockets vhost: %s", strerror(errno));
+ post_event(lws_server_exit, args->si);
+ }
+ else {
+ vhost_created = 1;
+ si = (ServerWS **)lws_protocol_vh_priv_zalloc(lws_vh, protocols, sizeof(ChannelServer *));
+ *si = args->si;
+ }
+ loc_free(args);
+ pthread_mutex_lock(&lws_list_mutex);
+ }
+
+ while (pending_connections < MAX_CONN_REQUESTS && !list_is_empty(&client_connect_list)) {
+ struct lws_client_connect_info client_connect_info;
+ ChannelConnectInfo * args = link2cci(client_connect_list.next);
+ WSIUserData * userdata = (WSIUserData *) loc_alloc_zero(sizeof (WSIUserData));
+
+ if (client_vhost == NULL) {
+ struct lws_context_creation_info vhost_creation_info;
+ memset(&vhost_creation_info, 0, sizeof vhost_creation_info);
+ vhost_creation_info.port = CONTEXT_PORT_NO_LISTEN;
+ vhost_creation_info.protocols = protocols;
+ vhost_creation_info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ vhost_creation_info.ssl_ca_filepath = args->ca_filepath;
+ vhost_creation_info.ssl_cert_filepath = args->certificate;
+ vhost_creation_info.ssl_private_key_filepath = args->key;
+ vhost_creation_info.ssl_cipher_list = args->cipher_list;
+
+ client_vhost = lws_create_vhost(lws_ctx, &vhost_creation_info);
+ }
+ if (client_vhost == NULL) {
+ trace (LOG_ALWAYS, "Error creating libwebsockets client context: %s", strerror(errno));
+ }
+ else vhost_created = 1;
+
+ list_remove(&args->link);
+ pending_connections++;
+ pthread_mutex_unlock(&lws_list_mutex);
+
+ userdata->args = args;
+
+ memset(&client_connect_info, 0, sizeof(client_connect_info));
+ client_connect_info.context = lws_ctx;
+ client_connect_info.path = args->get_url;
+ if (args->is_ssl) {
+ client_connect_info.ssl_connection = args->self_signed ? 2 : 1;
+ }
+ client_connect_info.address = args->host;
+ client_connect_info.host = client_connect_info.address;
+ client_connect_info.port = args->port;
+ client_connect_info.origin = client_connect_info.address;
+ client_connect_info.ietf_version_or_minus_one = -1;
+ client_connect_info.client_exts = exts;
+ client_connect_info.userdata = userdata;
+ client_connect_info.vhost = client_vhost;
+ client_connect_info.protocol = "tcf";
+
+ if (lws_client_connect_via_info(&client_connect_info));
+ pthread_mutex_lock(&lws_list_mutex);
+ }
+ pthread_mutex_unlock(&lws_list_mutex);
+ }
+ return NULL;
+}
+
+static ChannelServer * channel_lws_server(PeerServer * ps) {
+ const char * port_str = peer_server_getprop(ps, "Port", NULL);
+ const char * host = peer_server_getprop(ps, "Host", NULL);
+ const char * certificate = peer_server_getprop(ps, "Cert", NULL);
+ const char * key = peer_server_getprop(ps, "Key", NULL);
+ const char * client_cert = peer_server_getprop(ps, "ReqClientCert", NULL);
+ ChannelServer * server;
+ int is_ssl = strcmp(peer_server_getprop(ps, "TransportName", ""), "WSS") == 0;
+ const char * ca_filepath = peer_server_getprop(ps, "CAfile", NULL);
+ const char * cipher_list = peer_server_getprop(ps, "CipherList", NULL);
+ ServerCreateInfo * args;
+ int port = 0;
+
+ if (port_str != NULL) port = atoi(port_str);
+ if (port == 0) {
+ trace(LOG_ALWAYS, "Specifying a port number in WebSocket server URL is mandatory");
+ set_fmt_errno(ERR_OTHER, "Specifying a port number in WebSocket server URL is mandatory");
+ return NULL;
+ }
+ if (host != NULL) {
+ trace(LOG_ALWAYS, "Specifying host in WebSocket server URL is not supported");
+ set_fmt_errno(ERR_OTHER, "Specifying host in WebSocket server URL is not supported");
+ return NULL;
+ }
+ if (is_ssl) {
+ if (key == NULL) {
+ trace(LOG_ALWAYS, "Key parameter needs to be specified for secured WebSocket connection");
+ set_fmt_errno(ERR_OTHER, "Key parameter needs to be specified for secured WebSocket connection");
+ return NULL;
+ }
+ if (certificate == NULL) {
+ trace(LOG_ALWAYS, "Cert parameter needs to be specified for secured WebSocket connection");
+ set_fmt_errno(ERR_OTHER, "Cert parameter needs to be specified for secured WebSocket connection");
+ return NULL;
+ }
+ }
+
+ args = (ServerCreateInfo *)loc_alloc_zero(sizeof(ServerCreateInfo));
+ args->port = port_str ? atoi(port_str) : 0;
+ if (is_ssl) {
+ args->certificate = certificate;
+ args->key = key;
+ args->ca_filepath = ca_filepath;
+ args->cipher_list = cipher_list;
+ args->options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ if (client_cert
+ && (strcmp(client_cert, "true") == 0
+ || strcmp(client_cert, "1") == 0)) {
+ args->options |= LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT;
+ }
+ }
+ server = channel_server_create(ps);
+ args->si = server2ws(server);
+ args->si->is_ssl = is_ssl;
+ pthread_mutex_lock(&lws_list_mutex);
+ list_add_last(&args->link, &server_create_list);
+#if ENABLE_Epoll
+ channel_lws_abort_epoll();
+#else
+ lws_cancel_service(lws_ctx);
+#endif
+ pthread_mutex_unlock(&lws_list_mutex);
+ return server;
+}
+
+static void server_lws_connect_done(void * x) {
+ SessionData * data = (SessionData *)x;
+ ServerWS * si = data->si;
+ ChannelWS * c;
+ assert (si != NULL);
+ pthread_mutex_lock(&data->mutex);
+ if (data->wsi == NULL) {
+ pthread_mutex_unlock(&data->mutex);
+ return;
+ }
+ c= create_channel(si->is_ssl, 1);
+ data->c = c;
+ c->data = data;
+ lws_lock(&c->chan);
+ pthread_mutex_unlock(&data->mutex);
+ set_peer_addr(c, data->addr_buf, data->addr_len);
+ si->serv.new_conn(&si->serv, &c->chan);
+}
+
+static void channel_lws_connect_done(void * x) {
+ ChannelConnectInfo * args = (ChannelConnectInfo *)x;
+ pthread_mutex_lock(&lws_list_mutex);
+ pending_connections--;
+ pthread_mutex_unlock(&lws_list_mutex);
+
+ if (args->error) {
+ if (args->error == EPERM) {
+ args->error = set_fmt_errno(ERR_OTHER, "Failed to handshake the connection h:%s p:%d", args->host, args->port);
+ }
+ else if (args->error == ECONNREFUSED) {
+ args->error = set_fmt_errno(ERR_OTHER, "Failed to establish connection h:%s p:%d", args->host, args->port);
+ }
+ args->callback(args->callback_args, args->error, NULL);
+ loc_free(args->data);
+ }
+ else {
+ ChannelWS * c = create_channel(args->is_ssl, 0);
+ if (c == NULL) {
+ args->callback(args->callback_args, errno, NULL);
+ loc_free(args->data);
+ }
+ else {
+ args->data->c = c;
+ c->data = args->data;
+ lws_lock(&c->chan);
+ set_peer_addr(c, args->data->addr_buf, args->data->addr_len);
+ args->callback(args->callback_args, 0, &c->chan);
+ }
+ }
+ loc_free(args->host);
+ loc_free(args->get_url);
+ loc_free(args->ca_filepath);
+ loc_free(args->certificate);
+ loc_free(args->cipher_list);
+ loc_free(args->key);
+ loc_free(args);
+}
+
+static void channel_lws_connect(PeerServer * ps, ChannelConnectCallBack callback, void * callback_args) {
+ const char * host = peer_server_getprop(ps, "Host", NULL);
+ const char * port_str = peer_server_getprop(ps, "Port", NULL);
+ const char * get_url = peer_server_getprop(ps, "GetUrl", NULL);
+ const char * self_signed = peer_server_getprop(ps, "SelfSigned", NULL);
+ const char * ca_filepath = peer_server_getprop(ps, "CAfile", NULL);
+ const char * cipher_list = peer_server_getprop(ps, "CipherList", NULL);
+
+ const char * certificate = peer_server_getprop(ps, "Cert", NULL);
+ const char * key = peer_server_getprop(ps, "Key", NULL);
+ ChannelConnectInfo * args;
+
+ args = (ChannelConnectInfo *)loc_alloc_zero(sizeof(ChannelConnectInfo));
+ args->callback = callback;
+ args->callback_args = callback_args;
+ args->is_ssl = strcmp(peer_server_getprop(ps, "TransportName", ""), "WSS") == 0;
+ if (port_str != NULL) args->port = atoi(port_str);
+ args->ca_filepath = (ca_filepath != NULL ? loc_strdup(ca_filepath) : NULL);
+ args->certificate = (certificate != NULL ? loc_strdup(certificate) : NULL);
+ args->cipher_list = (cipher_list != NULL ? loc_strdup(cipher_list) : NULL);
+ args->key = (key != NULL ? loc_strdup(key) : NULL);
+ args->host = loc_strdup(host == NULL ? "127.0.0.1" : host);
+ args->get_url = loc_strdup(get_url == NULL ? "/" : get_url);
+ args->self_signed = self_signed ? (strcmp(self_signed, "true") == 0 || strcmp(self_signed, "1") == 0) : 0;
+ pthread_mutex_lock(&lws_list_mutex);
+ list_add_last(&args->link, &client_connect_list);
+#if ENABLE_Epoll
+ channel_lws_abort_epoll();
+#else
+ lws_cancel_service(lws_ctx);
+#endif
+ pthread_mutex_unlock(&lws_list_mutex);
+}
+
+static void trace_lws(int level, const char *line) {
+ trace(LOG_PROTOCOL, line);
+}
+
+void channel_lws_get_properties(Channel * c, char *** prop_names, char *** prop_values, unsigned * prop_cnt) {
+ ChannelWS * c1 = (ChannelWS *)c;
+ assert(is_dispatch_thread());
+ if(c1->magic != CHANNEL_MAGIC) {
+ *prop_cnt = 0;
+ return;
+ }
+ assert(c1->lock_cnt > 0);
+ *prop_names = ((ChannelWS *)c1)->data->prop_names;
+ *prop_values = ((ChannelWS *)c1)->data->prop_values;
+ *prop_cnt = ((ChannelWS *)c1)->data->prop_cnt;
+}
+
+void ini_lws() {
+ static int initialized = 0;
+ pthread_t thread;
+#if ENABLE_Epoll
+ struct epoll_event event;
+ assert (POLLIN == EPOLLIN);
+ assert (POLLPRI == EPOLLPRI);
+ assert (POLLOUT == EPOLLOUT );
+ assert (POLLRDHUP ==EPOLLRDHUP);
+ assert (POLLERR == EPOLLERR);
+ assert (POLLHUP == EPOLLHUP);
+#endif
+
+ if (initialized) return;
+#if ENABLE_Epoll
+ epoll_fd = epoll_create1(0);
+ epoll_events = (uint32_t *)loc_alloc_zero(getdtablesize() * sizeof(uint32_t));
+ if (pipe(dummy_pipe_fds) < 0) check_error(errno);
+ memset(&event, 0, sizeof(struct epoll_event));
+ event.data.fd = dummy_pipe_fds[0];
+ event.events = EPOLLIN;
+ epoll_ctl (epoll_fd, EPOLL_CTL_ADD, event.data.fd, &event);
+#endif
+ if ((log_mode & LOG_PROTOCOL) == 0) lws_set_log_level (0, NULL);
+ else lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO, trace_lws);
+ add_channel_transport("WS", channel_lws_server, channel_lws_connect);
+ add_channel_transport("WSS", channel_lws_server, channel_lws_connect);
+
+ dummy_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP);
+ pthread_mutex_init(&lws_list_mutex, NULL);
+ pthread_create(&thread, NULL, lws_service_thread, NULL);
+ }
+#endif /* ENABLE_LibWebSockets */

Back to the top