Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEugene Tarassov2011-11-11 01:19:02 +0000
committerEugene Tarassov2011-11-11 01:19:02 +0000
commita107edc216f94c5ec059fdf74541908bc7e979bb (patch)
treea0d9a448043cf5b969c2ede059b7d19a3aa64b71 /agent/tcf/framework/channel_pipe.c
parent07c7a005cded9e4b4ec842d756d0e4d79dade149 (diff)
downloadorg.eclipse.tcf.agent-a107edc216f94c5ec059fdf74541908bc7e979bb.tar.gz
org.eclipse.tcf.agent-a107edc216f94c5ec059fdf74541908bc7e979bb.tar.xz
org.eclipse.tcf.agent-a107edc216f94c5ec059fdf74541908bc7e979bb.zip
Agent code is moved into separate "agent" directory, all C code moved into "tcf" directory.
Diffstat (limited to 'agent/tcf/framework/channel_pipe.c')
-rw-r--r--agent/tcf/framework/channel_pipe.c772
1 files changed, 772 insertions, 0 deletions
diff --git a/agent/tcf/framework/channel_pipe.c b/agent/tcf/framework/channel_pipe.c
new file mode 100644
index 00000000..1a74d9e1
--- /dev/null
+++ b/agent/tcf/framework/channel_pipe.c
@@ -0,0 +1,772 @@
+/*******************************************************************************
+ * Copyright (c) 2010 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ * You may elect to redistribute this code under either of these licenses.
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * Implements input and output stream over named pipe transport.
+ */
+
+#include <config.h>
+
+#if defined(WIN32)
+
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+#include <framework/channel.h>
+#include <framework/channel_pipe.h>
+#include <framework/myalloc.h>
+#include <framework/protocol.h>
+#include <framework/errors.h>
+#include <framework/events.h>
+#include <framework/exceptions.h>
+#include <framework/trace.h>
+#include <framework/json.h>
+#include <framework/peer.h>
+#include <framework/asyncreq.h>
+#include <framework/inputbuf.h>
+#include <framework/outputbuf.h>
+
+#define BUF_SIZE (128 * MEM_USAGE_FACTOR)
+#define CHANNEL_MAGIC 0x52376532
+#define SERVER_INSTANCE_CNT 16
+#define DEFAULT_PIPE_NAME "TCF-Agent"
+#define DEFAULT_PIPE_DIR "//./pipe/"
+
+static const char * def_pipe_name = DEFAULT_PIPE_DIR DEFAULT_PIPE_NAME;
+static const char * attr_pipe_name = "PipeName";
+
+typedef struct ChannelPIPE ChannelPIPE;
+typedef struct ServerPIPE ServerPIPE;
+typedef struct ServerInstance ServerInstance;
+
+struct ChannelPIPE {
+ Channel chan; /* Public channel information - must be first */
+ int magic; /* Magic number */
+ int lock_cnt; /* Stream lock count, when > 0 channel cannot be deleted */
+ int fd_inp;
+ int fd_out;
+ ServerInstance * server;
+
+ /* Input stream buffer */
+ InputBuf ibuf;
+
+ /* Output stream state */
+ int out_flush_cnt;
+ unsigned char obuf[BUF_SIZE];
+ OutputQueue out_queue;
+ AsyncReqInfo out_req;
+
+ /* Async read request */
+ AsyncReqInfo rd_req;
+ int read_pending;
+};
+
+struct ServerInstance {
+ ServerPIPE * server;
+ int index;
+ int fd_inp;
+ int fd_out;
+#if defined(WIN32) && !defined(__CYGWIN__)
+ HANDLE pipe;
+ AsyncReqInfo req;
+#endif
+};
+
+struct ServerPIPE {
+ ChannelServer serv;
+ PeerServer * ps;
+ LINK servlink;
+ ServerInstance arr[SERVER_INSTANCE_CNT];
+};
+
+#define channel2pipe(A) ((ChannelPIPE *)((char *)(A) - offsetof(ChannelPIPE, chan)))
+#define inp2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, inp)))
+#define out2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, out)))
+#define server2pipe(A) ((ServerPIPE *)((char *)(A) - offsetof(ServerPIPE, serv)))
+#define servlink2pipe(A) ((ServerPIPE *)((char *)(A) - offsetof(ServerPIPE, servlink)))
+#define ibuf2pipe(A) ((ChannelPIPE *)((char *)(A) - offsetof(ChannelPIPE, ibuf)))
+#define obuf2pipe(A) ((ChannelPIPE *)((char *)(A) - offsetof(ChannelPIPE, out_queue)))
+
+static LINK server_list;
+static void pipe_read_done(void * x);
+static void handle_channel_msg(void * x);
+
+static void close_input_pipe(ChannelPIPE * c);
+static void close_output_pipe(ChannelPIPE * c);
+
+static void delete_channel(ChannelPIPE * c) {
+ trace(LOG_PROTOCOL, "Deleting channel %#lx", c);
+ assert(c->lock_cnt == 0);
+ assert(c->out_flush_cnt == 0);
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->read_pending == 0);
+ assert(c->ibuf.handling_msg != HandleMsgTriggered);
+ assert(output_queue_is_empty(&c->out_queue));
+ output_queue_clear(&c->out_queue);
+ channel_clear_broadcast_group(&c->chan);
+ close_input_pipe(c);
+ c->magic = 0;
+ loc_free(c->ibuf.buf);
+ loc_free(c->chan.peer_name);
+ loc_free(c);
+}
+
+static void pipe_lock(Channel * channel) {
+ ChannelPIPE * c = channel2pipe(channel);
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ c->lock_cnt++;
+}
+
+static void pipe_unlock(Channel * channel) {
+ ChannelPIPE * c = channel2pipe(channel);
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->lock_cnt > 0);
+ c->lock_cnt--;
+ if (c->lock_cnt == 0) {
+ assert(!c->read_pending);
+ delete_channel(c);
+ }
+}
+
+static int pipe_is_closed(Channel * channel) {
+ ChannelPIPE * c = channel2pipe(channel);
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->lock_cnt > 0);
+ return c->chan.state == ChannelStateDisconnected;
+}
+
+static void done_write_request(void * args) {
+ ChannelPIPE * c = (ChannelPIPE *)((AsyncReqInfo *)args)->client_data;
+ int size = 0;
+ int error = 0;
+
+ assert(args == &c->out_req);
+ if (c->out_req.u.fio.rval < 0) error = c->out_req.error;
+ else size = c->out_req.u.fio.rval;
+ output_queue_done(&c->out_queue, error, size);
+
+ if (output_queue_is_empty(&c->out_queue) &&
+ c->chan.state == ChannelStateDisconnected) close_output_pipe(c);
+
+ pipe_unlock(&c->chan);
+}
+
+static void post_write_request(OutputBuffer * bf) {
+ ChannelPIPE * c = obuf2pipe(bf->queue);
+ c->out_req.client_data = c;
+ c->out_req.done = done_write_request;
+ c->out_req.type = AsyncReqWrite;
+ c->out_req.u.fio.fd = c->fd_out;
+ c->out_req.u.fio.bufp = bf->buf + bf->buf_pos;
+ c->out_req.u.fio.bufsz = bf->buf_len - bf->buf_pos;
+ async_req_post(&c->out_req);
+ pipe_lock(&c->chan);
+}
+
+static void create_write_request(ChannelPIPE * c, const void * buf, size_t size) {
+ if (c->chan.state == ChannelStateDisconnected) return;
+ c->out_queue.post_io_request = post_write_request;
+ output_queue_add(&c->out_queue, buf, size);
+}
+
+static void pipe_flush(ChannelPIPE * c) {
+ unsigned char * p = c->obuf;
+ unsigned char * e = c->chan.out.cur;
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->chan.out.end == p + sizeof(c->obuf));
+ if (e == p) return;
+ assert(e >= p && e <= p + sizeof(c->obuf));
+ create_write_request(c, p, e - p);
+ c->chan.out.cur = p;
+}
+
+static void pipe_flush_event(void * x) {
+ ChannelPIPE * c = (ChannelPIPE *)x;
+ assert(c->magic == CHANNEL_MAGIC);
+ if (--c->out_flush_cnt == 0) {
+ int congestion_level = c->chan.congestion_level;
+ if (congestion_level > 0) usleep(congestion_level * 2500);
+ pipe_flush(c);
+ pipe_unlock(&c->chan);
+ }
+}
+
+static void pipe_write_stream(OutputStream * out, int byte) {
+ ChannelPIPE * c = channel2pipe(out2channel(out));
+ assert(c->magic == CHANNEL_MAGIC);
+ if (c->chan.state == ChannelStateDisconnected) return;
+ if (c->chan.out.cur == c->chan.out.end) pipe_flush(c);
+ if (byte < 0 || byte == ESC) {
+ char esc = 0;
+ *c->chan.out.cur++ = ESC;
+ if (byte == ESC) esc = 0;
+ else if (byte == MARKER_EOM) esc = 1;
+ else if (byte == MARKER_EOS) esc = 2;
+ else assert(0);
+ if (c->chan.state == ChannelStateDisconnected) return;
+ if (c->chan.out.cur == c->chan.out.end) pipe_flush(c);
+ *c->chan.out.cur++ = esc;
+ if (byte == MARKER_EOM && c->out_flush_cnt < 8) {
+ if (c->out_flush_cnt++ == 0) pipe_lock(&c->chan);
+ post_event(pipe_flush_event, c);
+ }
+ return;
+ }
+ *c->chan.out.cur++ = (char)byte;
+}
+
+static void pipe_write_block_stream(OutputStream * out, const char * bytes, size_t size) {
+ size_t cnt = 0;
+ ChannelPIPE * c = channel2pipe(out2channel(out));
+
+ if (out->supports_zero_copy && size > 32) {
+ /* Send the binary data escape seq */
+ size_t n = size;
+ if (c->chan.out.cur >= c->chan.out.end - 8) pipe_flush(c);
+ *c->chan.out.cur++ = ESC;
+ *c->chan.out.cur++ = 3;
+ for (;;) {
+ if (n <= 0x7fu) {
+ *c->chan.out.cur++ = (char)n;
+ break;
+ }
+ *c->chan.out.cur++ = (n & 0x7fu) | 0x80u;
+ n = n >> 7;
+ }
+ /* We need to flush the buffer then send our data */
+ pipe_flush(c);
+
+ create_write_request(c, bytes, size);
+ return;
+ }
+
+ while (cnt < size) write_stream(out, (unsigned char)bytes[cnt++]);
+}
+
+static ssize_t pipe_splice_block_stream(OutputStream * out, int fd, size_t size, int64_t * offset) {
+ ssize_t rd = 0;
+ char buffer[BUF_SIZE];
+ assert(is_dispatch_thread());
+ if (size == 0) return 0;
+ if (size > BUF_SIZE) size = BUF_SIZE;
+ if (offset != NULL) {
+ rd = pread(fd, buffer, size, (off_t)*offset);
+ if (rd > 0) *offset += rd;
+ }
+ else {
+ rd = read(fd, buffer, size);
+ }
+ if (rd > 0) pipe_write_block_stream(out, buffer, rd);
+ return rd;
+}
+
+static void pipe_post_read(InputBuf * ibuf, unsigned char * buf, size_t size) {
+ ChannelPIPE * c = ibuf2pipe(ibuf);
+
+ if (c->read_pending) return;
+ c->read_pending = 1;
+ c->rd_req.u.fio.bufp = buf;
+ c->rd_req.u.fio.bufsz = size;
+ async_req_post(&c->rd_req);
+}
+
+static void pipe_wait_read(InputBuf * ibuf) {
+ ChannelPIPE * c = ibuf2pipe(ibuf);
+
+ /* Wait for read to complete */
+ assert(c->lock_cnt > 0);
+ assert(c->read_pending != 0);
+ cancel_event(pipe_read_done, &c->rd_req, 1);
+ pipe_read_done(&c->rd_req);
+}
+
+static int pipe_read_stream(InputStream * inp) {
+ Channel * channel = inp2channel(inp);
+ ChannelPIPE * c = channel2pipe(channel);
+
+ assert(c->lock_cnt > 0);
+ if (inp->cur < inp->end) return *inp->cur++;
+ return ibuf_get_more(&c->ibuf, 0);
+}
+
+static int pipe_peek_stream(InputStream * inp) {
+ Channel * channel = inp2channel(inp);
+ ChannelPIPE * c = channel2pipe(channel);
+
+ assert(c->lock_cnt > 0);
+ if (inp->cur < inp->end) return *inp->cur;
+ return ibuf_get_more(&c->ibuf, 1);
+}
+
+static void send_eof_and_close(Channel * channel, int err) {
+ ChannelPIPE * c = channel2pipe(channel);
+
+ assert(c->magic == CHANNEL_MAGIC);
+ if (channel->state == ChannelStateDisconnected) return;
+ ibuf_flush(&c->ibuf);
+ if (c->ibuf.handling_msg == HandleMsgTriggered) {
+ /* Cancel pending message handling */
+ cancel_event(handle_channel_msg, c, 0);
+ c->ibuf.handling_msg = HandleMsgIdle;
+ }
+ write_stream(&c->chan.out, MARKER_EOS);
+ write_errno(&c->chan.out, err);
+ write_stream(&c->chan.out, MARKER_EOM);
+ pipe_flush(c);
+ pipe_post_read(&c->ibuf, c->obuf, sizeof(c->obuf));
+ c->chan.state = ChannelStateDisconnected;
+ if (output_queue_is_empty(&c->out_queue)) close_output_pipe(c);
+ notify_channel_closed(channel);
+ if (channel->disconnected) {
+ channel->disconnected(channel);
+ }
+ else {
+ trace(LOG_PROTOCOL, "channel %#lx disconnected", c);
+ protocol_release(channel->protocol);
+ }
+ channel->protocol = NULL;
+}
+
+static void handle_channel_msg(void * x) {
+ Trap trap;
+ ChannelPIPE * c = (ChannelPIPE *)x;
+ int has_msg;
+
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->ibuf.handling_msg == HandleMsgTriggered);
+ assert(c->ibuf.message_count);
+
+ has_msg = ibuf_start_message(&c->ibuf);
+ if (has_msg <= 0) {
+ if (has_msg < 0 && c->chan.state != ChannelStateDisconnected) {
+ trace(LOG_PROTOCOL, "Pipe is close by remote peer, channel %#lx %s", c, c->chan.peer_name);
+ channel_close(&c->chan);
+ }
+ }
+ else if (set_trap(&trap)) {
+ if (c->chan.receive) {
+ c->chan.receive(&c->chan);
+ }
+ else {
+ handle_protocol_message(&c->chan);
+ }
+ clear_trap(&trap);
+ }
+ else {
+ trace(LOG_ALWAYS, "Exception in message handler: %d %s",
+ trap.error, errno_to_str(trap.error));
+ send_eof_and_close(&c->chan, trap.error);
+ }
+}
+
+static void channel_check_pending(Channel * channel) {
+ ChannelPIPE * c = channel2pipe(channel);
+
+ assert(is_dispatch_thread());
+ if (c->ibuf.handling_msg == HandleMsgIdle && c->ibuf.message_count) {
+ post_event(handle_channel_msg, c);
+ c->ibuf.handling_msg = HandleMsgTriggered;
+ }
+}
+
+static void pipe_trigger_message(InputBuf * ibuf) {
+ ChannelPIPE * c = ibuf2pipe(ibuf);
+
+ assert(is_dispatch_thread());
+ assert(c->ibuf.message_count > 0);
+ if (c->ibuf.handling_msg == HandleMsgIdle) {
+ post_event(handle_channel_msg, c);
+ c->ibuf.handling_msg = HandleMsgTriggered;
+ }
+}
+
+static int channel_get_message_count(Channel * channel) {
+ ChannelPIPE * c = channel2pipe(channel);
+ assert(is_dispatch_thread());
+ if (c->ibuf.handling_msg != HandleMsgTriggered) return 0;
+ return c->ibuf.message_count;
+}
+
+static void pipe_read_done(void * x) {
+ AsyncReqInfo * req = (AsyncReqInfo *)x;
+ ChannelPIPE * c = (ChannelPIPE *)req->client_data;
+ int len = 0;
+
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->read_pending != 0);
+ assert(c->lock_cnt > 0);
+ c->read_pending = 0;
+ len = c->rd_req.u.fio.rval;
+ if (req->error) {
+ if (c->chan.state != ChannelStateDisconnected) {
+ trace(LOG_ALWAYS, "Can't read from pipe: %s", errno_to_str(req->error));
+ }
+ len = 0; /* Treat error as eof */
+ }
+ if (c->chan.state != ChannelStateDisconnected) {
+ ibuf_read_done(&c->ibuf, len);
+ }
+ else if (len > 0) {
+ pipe_post_read(&c->ibuf, c->obuf, sizeof(c->obuf));
+ }
+ else {
+ pipe_unlock(&c->chan);
+ }
+}
+
+static void start_channel(Channel * channel) {
+ ChannelPIPE * c = channel2pipe(channel);
+
+ assert(is_dispatch_thread());
+ assert(c->magic == CHANNEL_MAGIC);
+ assert(c->fd_inp >= 0);
+ assert(c->fd_out >= 0);
+ if (c->chan.connecting) {
+ c->chan.connecting(&c->chan);
+ }
+ else {
+ trace(LOG_PROTOCOL, "channel server connecting");
+ send_hello_message(&c->chan);
+ }
+ ibuf_trigger_read(&c->ibuf);
+}
+
+static ChannelPIPE * create_channel(int fd_inp, int fd_out, ServerInstance * server) {
+ ChannelPIPE * c = NULL;
+
+ assert(fd_inp >= 0);
+ assert(fd_out >= 0);
+
+ c = (ChannelPIPE *)loc_alloc_zero(sizeof *c);
+ c->magic = CHANNEL_MAGIC;
+ c->chan.inp.read = pipe_read_stream;
+ c->chan.inp.peek = pipe_peek_stream;
+ c->chan.out.cur = c->obuf;
+ c->chan.out.end = c->obuf + sizeof(c->obuf);
+ c->chan.out.write = pipe_write_stream;
+ c->chan.out.write_block = pipe_write_block_stream;
+ c->chan.out.splice_block = pipe_splice_block_stream;
+ c->chan.state = ChannelStateStartWait;
+ c->chan.start_comm = start_channel;
+ c->chan.check_pending = channel_check_pending;
+ c->chan.message_count = channel_get_message_count;
+ c->chan.lock = pipe_lock;
+ c->chan.unlock = pipe_unlock;
+ c->chan.is_closed = pipe_is_closed;
+ c->chan.close = send_eof_and_close;
+ ibuf_init(&c->ibuf, &c->chan.inp);
+ c->ibuf.post_read = pipe_post_read;
+ c->ibuf.wait_read = pipe_wait_read;
+ c->ibuf.trigger_message = pipe_trigger_message;
+ c->fd_inp = fd_inp;
+ c->fd_out = fd_out;
+ c->lock_cnt = 1;
+ c->server = server;
+ c->rd_req.done = pipe_read_done;
+ c->rd_req.client_data = c;
+ c->rd_req.type = AsyncReqRead;
+ c->rd_req.u.fio.fd = fd_inp;
+ output_queue_ini(&c->out_queue);
+ return c;
+}
+
+static void set_peer_name(ChannelPIPE * c) {
+ /* Create a human readable channel name that uniquely identifies remote peer */
+ char name[256];
+ static int pipe_cnt = 0;
+ snprintf(name, sizeof(name), "PIPE:%d", pipe_cnt++);
+ c->chan.peer_name = loc_strdup(name);
+}
+
+typedef struct ChannelConnectInfo {
+ ChannelConnectCallBack callback;
+ void * callback_args;
+ int error;
+ int fd_inp;
+ int fd_out;
+} ChannelConnectInfo;
+
+static void channel_pipe_connect_done(void * args) {
+ ChannelConnectInfo * info = (ChannelConnectInfo *)args;
+ if (info->error) {
+ info->callback(info->callback_args, info->error, NULL);
+ }
+ else {
+ ChannelPIPE * c = create_channel(info->fd_inp, info->fd_out, 0);
+ set_peer_name(c);
+ info->callback(info->callback_args, 0, &c->chan);
+ }
+ loc_free(info);
+}
+
+void channel_pipe_connect(PeerServer * ps, ChannelConnectCallBack callback, void * callback_args) {
+ const char * path = peer_server_getprop(ps, attr_pipe_name, def_pipe_name);
+ ChannelConnectInfo * info = (ChannelConnectInfo *)loc_alloc_zero(sizeof(ChannelConnectInfo));
+ char out_path[FILE_PATH_SIZE];
+
+ info->fd_out = -1;
+ info->fd_inp = open(path, O_BINARY | O_RDONLY, 0);
+ if (info->fd_inp < 0) info->error = errno;
+
+ if (!info->error) {
+ int l = read(info->fd_inp, out_path, sizeof(out_path) - 1);
+ if (l < 0) info->error = errno;
+ else out_path[l] = 0;
+ }
+
+ if (!info->error) {
+ info->fd_out = open(out_path, O_BINARY | O_WRONLY, 0);
+ if (info->fd_out < 0) info->error = errno;
+ }
+
+ if (info->error) {
+ if (info->fd_inp >= 0) close(info->fd_inp);
+ if (info->fd_out >= 0) close(info->fd_out);
+ info->fd_inp = -1;
+ info->fd_out = -1;
+ }
+
+ info->callback = callback;
+ info->callback_args = callback_args;
+ post_event(channel_pipe_connect_done, info);
+}
+
+#if defined(WIN32) && !defined(__CYGWIN__)
+
+#define check_error_win32(ok) { if (!(ok)) check_error(set_win32_errno(GetLastError())); }
+
+static void pipe_client_connected(void * args) {
+ AsyncReqInfo * req = (AsyncReqInfo *)args;
+ ServerInstance * ins = (ServerInstance *)req->client_data;
+ int error = 0;
+
+ assert(req == &ins->req);
+
+ if (req->error) error = req->error;
+
+ if (!error) {
+ int l = 0;
+ HANDLE h = NULL;
+ OVERLAPPED overlap;
+ char inp_path[FILE_PATH_SIZE];
+ const char * path = peer_server_getprop(ins->server->ps, attr_pipe_name, def_pipe_name);
+ static unsigned pipe_cnt = 0;
+
+ memset(&overlap, 0, sizeof(overlap));
+ snprintf(inp_path, sizeof(inp_path), "%s-%u", path, pipe_cnt++);
+ l = strlen(inp_path) + 1;
+ h = CreateNamedPipe(inp_path, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ 1, BUF_SIZE, BUF_SIZE, 0, NULL);
+ if (h == INVALID_HANDLE_VALUE) error = set_win32_errno(GetLastError());
+ if (!error) {
+ ins->fd_inp = _open_osfhandle((intptr_t)h, O_BINARY | O_RDONLY);
+ if (ins->fd_inp < 0) error = errno;
+ }
+ if (!error) {
+ overlap.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (overlap.hEvent == NULL) error = set_win32_errno(GetLastError());
+ }
+ if (!error) {
+ if (ConnectNamedPipe(h, &overlap)) {
+ error = ERR_PROTOCOL;
+ }
+ else {
+ DWORD e = GetLastError();
+ if (e != ERROR_IO_PENDING) error = set_win32_errno(GetLastError());
+ }
+ }
+ if (!error) {
+ int wr = write(ins->fd_out, inp_path, l);
+ if (wr < 0) error = errno;
+ }
+ if (!error) {
+ switch (WaitForSingleObject(overlap.hEvent, 10000)) {
+ case WAIT_OBJECT_0:
+ break;
+ case WAIT_ABANDONED:
+ case WAIT_TIMEOUT:
+ error = ETIMEDOUT;
+ break;
+ default:
+ error = set_win32_errno(GetLastError());
+ break;
+ }
+ }
+ if (!CloseHandle(overlap.hEvent)) error = set_win32_errno(GetLastError());
+ }
+
+ if (error) {
+ trace(LOG_ALWAYS, "Cannot connect pipe: %s", errno_to_str(error));
+ if (ins->fd_inp >= 0) close(ins->fd_inp);
+ ins->fd_inp = -1;
+ DisconnectNamedPipe(ins->pipe);
+ async_req_post(&ins->req);
+ }
+ else {
+ ChannelPIPE * c = create_channel(ins->fd_inp, ins->fd_out, ins);
+ set_peer_name(c);
+ ins->server->serv.new_conn(&ins->server->serv, &c->chan);
+ }
+}
+
+static void close_input_pipe(ChannelPIPE * c) {
+ assert(c->fd_out < 0);
+ assert(c->fd_inp > 0);
+ if (c->server != NULL) {
+ ServerInstance * ins = c->server;
+ close(ins->fd_inp);
+ ins->fd_inp = -1;
+ async_req_post(&ins->req);
+ c->server = NULL;
+ }
+ else {
+ close(c->fd_inp);
+ }
+ c->fd_inp = -1;
+}
+
+static void close_output_pipe(ChannelPIPE * c) {
+ assert(c->fd_out > 0);
+ assert(c->fd_inp > 0);
+ if (c->server != NULL) {
+ ServerInstance * ins = c->server;
+ check_error_win32(DisconnectNamedPipe(ins->pipe));
+ }
+ else {
+ close(c->fd_out);
+ }
+ c->fd_out = -1;
+}
+
+static void register_server(ServerPIPE * s) {
+ int i;
+ PeerServer * ps = s->ps;
+ PeerServer * ps2 = peer_server_alloc();
+ const char * transport = peer_server_getprop(ps, "TransportName", NULL);
+ const char * path = peer_server_getprop(ps, attr_pipe_name, def_pipe_name);
+ char id[256];
+
+ ps2->flags = ps->flags | PS_FLAG_LOCAL | PS_FLAG_DISCOVERABLE;
+ for (i = 0; i < ps->ind; i++) {
+ peer_server_addprop(ps2, loc_strdup(ps->list[i].name), loc_strdup(ps->list[i].value));
+ }
+ i = strlen(DEFAULT_PIPE_DIR);
+ if (strncmp(path, DEFAULT_PIPE_DIR, i) != 0) i = 0;
+ snprintf(id, sizeof(id), "%s:%s", transport, path + i);
+ for (i = 0; id[i]; i++) {
+ /* Character '/' is prohibited in a peer ID string */
+ if (id[i] == '/') id[i] = '|';
+ }
+ peer_server_addprop(ps2, loc_strdup("ID"), loc_strdup(id));
+ peer_server_addprop(ps2, loc_strdup(attr_pipe_name), loc_strdup(path));
+ peer_server_add(ps2, ~0u);
+}
+
+#else
+
+static void close_output_pipe(ChannelPIPE * c) {
+}
+
+static void close_input_pipe(ChannelPIPE * c) {
+}
+
+#endif
+
+static void server_close(ChannelServer * serv) {
+ ServerPIPE * s = server2pipe(serv);
+ int i;
+
+ assert(is_dispatch_thread());
+ list_remove(&s->servlink);
+ peer_server_free(s->ps);
+ for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
+ ServerInstance * ins = s->arr + i;
+ if (ins->fd_inp >= 0 && close(ins->fd_inp) < 0) check_error(errno);
+ if (ins->fd_out >= 0 && close(ins->fd_out) < 0) check_error(errno);
+ ins->fd_inp = ins->fd_out = -1;
+#if defined(WIN32) && !defined(__CYGWIN__)
+ ins->pipe = NULL;
+#endif
+ }
+ /* TODO: free 's' when all pending reqs are done */
+}
+
+ChannelServer * channel_pipe_server(PeerServer * ps) {
+ ServerPIPE * s = (ServerPIPE *)loc_alloc_zero(sizeof(ServerPIPE));
+ if (server_list.next == NULL) list_init(&server_list);
+#if defined(WIN32) && !defined(__CYGWIN__)
+ {
+ int i;
+ const char * path = peer_server_getprop(ps, attr_pipe_name, def_pipe_name);
+ for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
+ ServerInstance * ins = s->arr + i;
+ ins->server = s;
+ ins->index = i;
+ ins->pipe = CreateNamedPipe(path, PIPE_ACCESS_OUTBOUND,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ SERVER_INSTANCE_CNT, BUF_SIZE, BUF_SIZE, 0, NULL);
+
+ if (ins->pipe == INVALID_HANDLE_VALUE) {
+ set_win32_errno(GetLastError());
+ return NULL;
+ }
+ ins->fd_inp = -1;
+ ins->fd_out = _open_osfhandle((intptr_t)ins->pipe, O_BINARY | O_WRONLY);
+ }
+ s->ps = ps;
+ s->serv.close = server_close;
+ list_add_last(&s->servlink, &server_list);
+ for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
+ ServerInstance * ins = s->arr + i;
+ ins->req.type = AsyncReqConnectPipe;
+ ins->req.client_data = &s->arr[i];
+ ins->req.done = pipe_client_connected;
+ ins->req.u.cnp.pipe = ins->pipe;
+ async_req_post(&ins->req);
+ }
+ register_server(s);
+ return &s->serv;
+ }
+#else
+ s->serv.close = server_close;
+ /* TODO: Unix pipe channel */
+ loc_free(s);
+ errno = ERR_UNSUPPORTED;
+ return NULL;
+#endif
+}
+
+#else
+/* Pipes are not supported */
+#include <framework/errors.h>
+#include <framework/channel_pipe.h>
+
+void channel_pipe_connect(PeerServer * server, ChannelConnectCallBack callback, void * callback_args) {
+ callback(callback_args, ERR_UNSUPPORTED, NULL);
+}
+
+ChannelServer * channel_pipe_server(PeerServer * server) {
+ errno = ERR_UNSUPPORTED;
+ return NULL;
+}
+
+#endif

Back to the top