diff options
author | eutarass | 2010-12-21 20:32:53 +0000 |
---|---|---|
committer | eutarass | 2010-12-21 20:32:53 +0000 |
commit | 924a0a84057556938e1310084c99955e71a871d6 (patch) | |
tree | 847054f8e61180f8fb62021481f79fe681c02f42 | |
parent | c66af72a286cb4d793a395f9e9ffae31f056f81d (diff) | |
download | org.eclipse.tcf.agent-924a0a84057556938e1310084c99955e71a871d6.tar.gz org.eclipse.tcf.agent-924a0a84057556938e1310084c99955e71a871d6.tar.xz org.eclipse.tcf.agent-924a0a84057556938e1310084c99955e71a871d6.zip |
TCF Agent: implemented optional output queue for TCF communication channel.
In addition to improving the agent performance, the queue solves the problem of SSL and PIPE channels being prone to deadlocks.
-rw-r--r-- | agent.vcproj | 8 | ||||
-rw-r--r-- | framework/channel.h | 9 | ||||
-rw-r--r-- | framework/channel_tcp.c | 285 | ||||
-rw-r--r-- | framework/outputbuf.c | 116 | ||||
-rw-r--r-- | framework/outputbuf.h | 51 | ||||
-rw-r--r-- | server/server.vcproj | 8 |
6 files changed, 364 insertions, 113 deletions
diff --git a/agent.vcproj b/agent.vcproj index 847144e7..6341a515 100644 --- a/agent.vcproj +++ b/agent.vcproj @@ -336,6 +336,14 @@ >
</File>
<File
+ RelativePath=".\framework\outputbuf.c"
+ >
+ </File>
+ <File
+ RelativePath=".\framework\outputbuf.h"
+ >
+ </File>
+ <File
RelativePath=".\framework\peer.c"
>
</File>
diff --git a/framework/channel.h b/framework/channel.h index 95c5d55d..a81c927c 100644 --- a/framework/channel.h +++ b/framework/channel.h @@ -23,7 +23,11 @@ #include <framework/link.h> #include <framework/peer.h> +struct Protocol; typedef struct TCFBroadcastGroup TCFBroadcastGroup; +typedef struct ChannelServer ChannelServer; +typedef struct Channel Channel; + struct TCFBroadcastGroup { int magic; OutputStream out; /* Broadcast stream */ @@ -41,9 +45,6 @@ enum { ChannelStateDisconnected }; -struct Protocol; -typedef struct Channel Channel; - struct Channel { InputStream inp; /* Input stream */ OutputStream out; /* Output stream */ @@ -75,8 +76,6 @@ struct Channel { void (*disconnected)(Channel *); /* Called when channel is disconnected */ }; -typedef struct ChannelServer ChannelServer; - struct ChannelServer { void * client_data; /* Client data */ void (*new_conn)(ChannelServer *, Channel *); /* New connection call back */ diff --git a/framework/channel_tcp.c b/framework/channel_tcp.c index 7a9c881d..8ac25eb3 100644 --- a/framework/channel_tcp.c +++ b/framework/channel_tcp.c @@ -52,6 +52,7 @@ #include <framework/ip_ifc.h> #include <framework/asyncreq.h> #include <framework/inputbuf.h> +#include <framework/outputbuf.h> #include <services/discovery.h> #ifndef MSG_MORE @@ -67,6 +68,14 @@ #define CHANNEL_MAGIC 0x27208956 #define MAX_IFC 10 +#if !defined(ENABLE_OutputQueue) +# if ENABLE_SSL || ENABLE_ContextProxy || defined(WIN32) +# define ENABLE_OutputQueue 1 +# else +# define ENABLE_OutputQueue 0 +# endif +#endif + typedef struct ChannelTCP ChannelTCP; struct ChannelTCP { @@ -91,13 +100,17 @@ struct ChannelTCP { InputBuf ibuf; /* Output stream state */ + unsigned char * out_bin_block; unsigned char obuf[BUF_SIZE]; int out_errno; int out_flush_cnt; - unsigned char * out_bin_block; +#if ENABLE_OutputQueue + OutputQueue out_queue; + AsyncReqInfo wr_req; +#endif /* ENABLE_OutputQueue */ /* Async read request */ - AsyncReqInfo rdreq; + AsyncReqInfo rd_req; }; typedef struct ServerTCP ServerTCP; @@ -118,6 +131,7 @@ struct ServerTCP { #define server2tcp(A) ((ServerTCP *)((char *)(A) - offsetof(ServerTCP, serv))) #define servlink2tcp(A) ((ServerTCP *)((char *)(A) - offsetof(ServerTCP, servlink))) #define ibuf2tcp(A) ((ChannelTCP *)((char *)(A) - offsetof(ChannelTCP, ibuf))) +#define obuf2tcp(A) ((ChannelTCP *)((char *)(A) - offsetof(ChannelTCP, out_queue))) static LINK server_list; static void tcp_channel_read_done(void * x); @@ -187,6 +201,9 @@ static void delete_channel(ChannelTCP * c) { assert(c->ibuf.handling_msg != HandleMsgTriggered); channel_clear_broadcast_group(&c->chan); c->magic = 0; +#if ENABLE_OutputQueue + output_queue_clear(&c->out_queue); +#endif /* ENABLE_OutputQueue */ #if ENABLE_SSL if (c->ssl) SSL_free(c->ssl); #endif /* ENABLE_SSL */ @@ -227,63 +244,104 @@ static int tcp_is_closed(Channel * channel) { return c->chan.state == ChannelStateDisconnected; } +#if ENABLE_OutputQueue +static void done_write_request(void * args) { + ChannelTCP * c = (ChannelTCP *)((AsyncReqInfo *)args)->client_data; + int size = 0; + int error = 0; + + assert(args == &c->wr_req); + if (c->wr_req.u.sio.rval < 0) error = c->wr_req.error; + else if (c->wr_req.type == AsyncReqSend) size = c->wr_req.u.sio.rval; + output_queue_done(&c->out_queue, error, size); + if (error) c->out_errno = error; + if (output_queue_is_empty(&c->out_queue) && + c->chan.state == ChannelStateDisconnected) shutdown(c->socket, SHUT_WR); + tcp_unlock(&c->chan); +} + +static void post_write_request(OutputBuffer * bf) { + ChannelTCP * c = obuf2tcp(bf->queue); + c->wr_req.client_data = c; + c->wr_req.done = done_write_request; +#if ENABLE_SSL + if (c->ssl) { + int wr = SSL_write(c->ssl, bf->buf + bf->buf_pos, bf->buf_len - bf->buf_pos); + if (wr <= 0) { + int err = SSL_get_error(c->ssl, wr); + if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + c->wr_req.type = AsyncReqSelect; + c->wr_req.u.select.nfds = c->socket + 1; + FD_ZERO(&c->wr_req.u.select.readfds); + FD_ZERO(&c->wr_req.u.select.writefds); + FD_ZERO(&c->wr_req.u.select.errorfds); + if (err == SSL_ERROR_WANT_WRITE) FD_SET(c->socket, &c->wr_req.u.select.writefds); + if (err == SSL_ERROR_WANT_READ) FD_SET(c->socket, &c->wr_req.u.select.readfds); + FD_SET(c->socket, &c->wr_req.u.select.errorfds); + c->wr_req.u.select.timeout.tv_sec = 10; + async_req_post(&c->wr_req); + } + else { + int error = set_ssl_errno(); + c->chan.out.cur = c->obuf; + trace(LOG_PROTOCOL, "Can't SSL_write() on channel %#lx: %s", c, errno_to_str(error)); + c->wr_req.type = AsyncReqSend; + c->wr_req.error = error; + c->wr_req.u.sio.rval = -1; + post_event(done_write_request, &c->wr_req); + } + } + else { + c->wr_req.type = AsyncReqSend; + c->wr_req.error = 0; + c->wr_req.u.sio.rval = wr; + post_event(done_write_request, &c->wr_req); + } + } + else +#endif + { + c->wr_req.type = AsyncReqSend; + c->wr_req.u.sio.sock = c->socket; + c->wr_req.u.sio.bufp = bf->buf + bf->buf_pos; + c->wr_req.u.sio.bufsz = bf->buf_len - bf->buf_pos; + c->wr_req.u.sio.flags = c->out_queue.queue.next == c->out_queue.queue.prev ? 0 : MSG_MORE; + async_req_post(&c->wr_req); + } + tcp_lock(&c->chan); +} +#endif /* ENABLE_OutputQueue */ + static void tcp_flush_with_flags(ChannelTCP * c, int flags) { unsigned char * p = c->obuf; assert(is_dispatch_thread()); assert(c->magic == CHANNEL_MAGIC); assert(c->chan.out.end == p + sizeof(c->obuf)); assert(c->out_bin_block == NULL); + assert(c->chan.out.cur >= p); + assert(c->chan.out.cur <= p + sizeof(c->obuf)); if (c->chan.out.cur == p) return; - assert(c->chan.out.cur >= p && c->chan.out.cur <= p + sizeof(c->obuf)); - if (c->chan.state == ChannelStateDisconnected || c->out_errno) { - c->chan.out.cur = p; - return; - } - while (p < c->chan.out.cur) { - int wr = 0; - if (c->ssl) { -#if ENABLE_SSL - wr = SSL_write(c->ssl, p, c->chan.out.cur - p); - if (wr <= 0) { - int err = SSL_get_error(c->ssl, wr); - if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { - struct timeval tv; - fd_set readfds; - fd_set writefds; - fd_set errorfds; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - FD_ZERO(&errorfds); - if (err == SSL_ERROR_WANT_READ) FD_SET(c->socket, &readfds); - if (err == SSL_ERROR_WANT_WRITE) FD_SET(c->socket, &writefds); - FD_SET(c->socket, &errorfds); - tv.tv_sec = 10L; - tv.tv_usec = 0; - if (select(c->socket + 1, &readfds, &writefds, &errorfds, &tv) >= 0) continue; - } - trace(LOG_PROTOCOL, "Can't SSL_write() on channel %#lx: %s", c, - ERR_error_string(ERR_get_error(), NULL)); - c->out_errno = EIO; - c->chan.out.cur = c->obuf; - return; - } + if (c->chan.state != ChannelStateDisconnected && c->out_errno == 0) { +#if ENABLE_OutputQueue + c->out_queue.post_io_request = post_write_request; + output_queue_add(&c->out_queue, p, c->chan.out.cur - p); #else - assert(0); -#endif - } - else { - wr = send(c->socket, p, c->chan.out.cur - p, flags); + assert(c->ssl == NULL); + while (p < c->chan.out.cur) { + int sz = c->chan.out.cur - p; + int wr = send(c->socket, p, sz, flags); if (wr < 0) { int err = errno; - trace(LOG_PROTOCOL, "Can't send() on channel %#lx: %d %s", c, err, errno_to_str(err)); + trace(LOG_PROTOCOL, "Can't send() on channel %#lx: %s", c, errno_to_str(err)); c->out_errno = err; c->chan.out.cur = c->obuf; return; } + p += wr; } - p += wr; + assert(p == c->chan.out.cur); +#endif } - assert(p == c->chan.out.cur); c->chan.out.cur = c->obuf; } @@ -327,8 +385,6 @@ static void tcp_write_stream(OutputStream * out, int byte) { assert(c->magic == CHANNEL_MAGIC); if (!c->chan.out.supports_zero_copy || c->chan.out.cur >= c->chan.out.end - 32 || byte < 0) { if (c->out_bin_block != NULL) tcp_bin_block_end(c); - if (c->out_errno) return; - if (c->chan.state == ChannelStateDisconnected) return; if (c->chan.out.cur == c->chan.out.end) tcp_flush_with_flags(c, MSG_MORE); if (byte < 0 || byte == ESC) { char esc = 0; @@ -337,8 +393,6 @@ static void tcp_write_stream(OutputStream * out, int byte) { else if (byte == MARKER_EOM) esc = 1; else if (byte == MARKER_EOS) esc = 2; else assert(0); - if (c->out_errno) return; - if (c->chan.state == ChannelStateDisconnected) return; if (c->chan.out.cur == c->chan.out.end) tcp_flush_with_flags(c, MSG_MORE); *c->chan.out.cur++ = esc; if (byte == MARKER_EOM && c->out_flush_cnt < 4) { @@ -385,6 +439,13 @@ static int tcp_splice_block_stream(OutputStream * out, int fd, size_t size, off_ /* We need to flush the buffer then send our data */ tcp_flush_with_flags(c, MSG_MORE); +#if ENABLE_OutputQueue + while (!output_queue_is_empty(&c->out_queue)) { + cancel_event(done_write_request, &c->wr_req, 1); + done_write_request(&c->wr_req); + } +#endif + if (c->chan.state == ChannelStateDisconnected) return rd; if (c->out_errno) return rd; @@ -394,7 +455,7 @@ static int tcp_splice_block_stream(OutputStream * out, int fd, size_t size, off_ if (wr < 0) { c->out_errno = errno; - trace(LOG_PROTOCOL, "Error in socket splice: %d %s", errno, errno_to_str(errno)); + trace(LOG_PROTOCOL, "Error in socket splice: %s", errno_to_str(errno)); break; } n -= wr; @@ -430,26 +491,39 @@ static void tcp_post_read(InputBuf * ibuf, unsigned char * buf, int size) { if (c->ssl) { #if ENABLE_SSL c->read_done = SSL_read(c->ssl, c->read_buf, c->read_buf_size); - if (c->read_done > 0) { - post_event(c->rdreq.done, &c->rdreq); - return; + if (c->read_done <= 0) { + int err = SSL_get_error(c->ssl, c->read_done); + if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + FD_ZERO(&c->rd_req.u.select.readfds); + FD_ZERO(&c->rd_req.u.select.writefds); + FD_ZERO(&c->rd_req.u.select.errorfds); + if (err == SSL_ERROR_WANT_WRITE) FD_SET(c->socket, &c->rd_req.u.select.writefds); + if (err == SSL_ERROR_WANT_READ) FD_SET(c->socket, &c->rd_req.u.select.readfds); + FD_SET(c->socket, &c->rd_req.u.select.errorfds); + c->rd_req.u.select.timeout.tv_sec = 10; + c->read_done = -1; + async_req_post(&c->rd_req); + } + else { + if (c->chan.state != ChannelStateDisconnected) { + trace(LOG_ALWAYS, "Can't SSL_read() on channel %#lx: %s", c, errno_to_str(set_ssl_errno())); + } + c->read_done = 0; + post_event(c->rd_req.done, &c->rd_req); + } + } + else { + post_event(c->rd_req.done, &c->rd_req); } - FD_ZERO(&c->rdreq.u.select.readfds); - FD_ZERO(&c->rdreq.u.select.writefds); - FD_ZERO(&c->rdreq.u.select.errorfds); - FD_SET(c->socket, &c->rdreq.u.select.readfds); - FD_SET(c->socket, &c->rdreq.u.select.errorfds); - c->rdreq.u.select.timeout.tv_sec = 10; #else assert(0); #endif } else { - c->read_done = 0; - c->rdreq.u.sio.bufp = buf; - c->rdreq.u.sio.bufsz = size; + c->rd_req.u.sio.bufp = buf; + c->rd_req.u.sio.bufsz = size; + async_req_post(&c->rd_req); } - async_req_post(&c->rdreq); } static void tcp_wait_read(InputBuf * ibuf) { @@ -458,8 +532,8 @@ static void tcp_wait_read(InputBuf * ibuf) { /* Wait for read to complete */ assert(c->lock_cnt > 0); assert(c->read_pending != 0); - cancel_event(tcp_channel_read_done, &c->rdreq, 1); - tcp_channel_read_done(&c->rdreq); + cancel_event(tcp_channel_read_done, &c->rd_req, 1); + tcp_channel_read_done(&c->rd_req); } static int tcp_read_stream(InputStream * inp) { @@ -495,6 +569,9 @@ static void send_eof_and_close(Channel * channel, int err) { write_errno(&c->chan.out, err); write_stream(&c->chan.out, MARKER_EOM); tcp_flush_with_flags(c, 0); +#if ENABLE_OutputQueue + if (output_queue_is_empty(&c->out_queue)) +#endif shutdown(c->socket, SHUT_WR); c->chan.state = ChannelStateDisconnected; tcp_post_read(&c->ibuf, c->obuf, sizeof(c->obuf)); @@ -537,8 +614,7 @@ static void handle_channel_msg(void * x) { clear_trap(&trap); } else { - trace(LOG_ALWAYS, "Exception in message handler: %d %s", - trap.error, errno_to_str(trap.error)); + trace(LOG_ALWAYS, "Exception in message handler: %s", errno_to_str(trap.error)); send_eof_and_close(&c->chan, trap.error); } } @@ -583,32 +659,19 @@ static void tcp_channel_read_done(void * x) { c->read_pending = 0; if (c->ssl) { #if ENABLE_SSL - if (c->read_done > 0) { - len = c->read_done; - } - else { - len = SSL_read(c->ssl, c->read_buf, c->read_buf_size); - if (len <= 0) { - int err = SSL_get_error(c->ssl, len); - if (err == SSL_ERROR_WANT_READ) { - tcp_post_read(&c->ibuf, c->read_buf, c->read_buf_size); - return; - } - if (c->chan.state != ChannelStateDisconnected) { - trace(LOG_ALWAYS, "Can't SSL_read() on channel %#lx: %s", c, - ERR_error_string(ERR_get_error(), NULL)); - } - len = 0; - } + if (c->read_done < 0) { + tcp_post_read(&c->ibuf, c->read_buf, c->read_buf_size); + return; } + len = c->read_done; #else assert(0); #endif } else { - assert(c->read_buf == c->rdreq.u.sio.bufp); - assert((size_t)c->read_buf_size == c->rdreq.u.sio.bufsz); - len = c->rdreq.u.sio.rval; + assert(c->read_buf == c->rd_req.u.sio.bufp); + assert((size_t)c->read_buf_size == c->rd_req.u.sio.bufsz); + len = c->rd_req.u.sio.rval; if (req->error) { if (c->chan.state != ChannelStateDisconnected) { trace(LOG_ALWAYS, "Can't read from socket: %s", errno_to_str(req->error)); @@ -655,14 +718,12 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i, sizeof(i)) < 0) { int error = errno; trace(LOG_ALWAYS, "Can't set TCP_NODELAY option on a socket: %s", errno_to_str(error)); - closesocket(sock); errno = error; return NULL; } if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, sizeof(i)) < 0) { int error = errno; trace(LOG_ALWAYS, "Can't set SO_KEEPALIVE option on a socket: %s", errno_to_str(error)); - closesocket(sock); errno = error; return NULL; } @@ -686,10 +747,12 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do if (!err && (fp = fopen(fnm, "r")) == NULL) err = errno; if (!err && (rsa_key = PEM_read_RSAPrivateKey(fp, NULL, NULL, NULL)) == NULL) err = set_ssl_errno(); if (!err && fclose(fp) != 0) err = errno; - snprintf(fnm, sizeof(fnm), "%s/ssl/local.cert", tcf_dir); - if (!err && (fp = fopen(fnm, "r")) == NULL) err = errno; - if (!err && (ssl_cert = PEM_read_X509(fp, NULL, NULL, NULL)) == NULL) err = set_ssl_errno(); - if (!err && fclose(fp) != 0) err = errno; + if (!err) { + snprintf(fnm, sizeof(fnm), "%s/ssl/local.cert", tcf_dir); + if (!err && (fp = fopen(fnm, "r")) == NULL) err = errno; + if (!err && (ssl_cert = PEM_read_X509(fp, NULL, NULL, NULL)) == NULL) err = set_ssl_errno(); + if (!err && fclose(fp) != 0) err = errno; + } if (err) { trace(LOG_ALWAYS, "Cannot read SSL certificate %s: %s", fnm, errno_to_str(err)); set_errno(err, "Cannot read SSL certificate"); @@ -743,21 +806,24 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do c->ibuf.trigger_message = tcp_trigger_message; c->socket = sock; c->lock_cnt = 1; - c->rdreq.done = tcp_channel_read_done; - c->rdreq.client_data = c; + c->rd_req.done = tcp_channel_read_done; + c->rd_req.client_data = c; if (c->ssl) { #if ENABLE_SSL - c->rdreq.type = AsyncReqSelect; - c->rdreq.u.select.nfds = c->socket + 1; + c->rd_req.type = AsyncReqSelect; + c->rd_req.u.select.nfds = c->socket + 1; #else assert(0); #endif } else { - c->rdreq.type = AsyncReqRecv; - c->rdreq.u.sio.sock = c->socket; - c->rdreq.u.sio.flags = 0; + c->rd_req.type = AsyncReqRecv; + c->rd_req.u.sio.sock = c->socket; + c->rd_req.u.sio.flags = 0; } +#if ENABLE_OutputQueue + output_queue_ini(&c->out_queue); +#endif return c; } @@ -860,7 +926,6 @@ static void set_peer_addr(ChannelTCP * c, struct sockaddr * addr, int addr_len) static void tcp_server_accept_done(void * x) { AsyncReqInfo * req = (AsyncReqInfo *)x; ServerTCP * si = (ServerTCP *)req->client_data; - ChannelTCP * c = NULL; if (si->sock < 0) { /* Server closed. */ @@ -868,16 +933,20 @@ static void tcp_server_accept_done(void * x) { return; } if (req->error) { - trace(LOG_ALWAYS, "socket accept failed: %d %s", req->error, errno_to_str(req->error)); - si->accreq.u.acc.addrlen = si->addr_len; - async_req_post(req); - return; + trace(LOG_ALWAYS, "Socket accept failed: %s", errno_to_str(req->error)); } - c = create_channel(req->u.acc.rval, strcmp(peer_server_getprop(si->ps, "TransportName", ""), "SSL") == 0, 1, - si->addr_buf->sa_family == AF_UNIX); - if (c != NULL) { - set_peer_addr(c, si->addr_buf, si->addr_len); - si->serv.new_conn(&si->serv, &c->chan); + else { + int ssl = strcmp(peer_server_getprop(si->ps, "TransportName", ""), "SSL") == 0; + int unix_domain = si->addr_buf->sa_family == AF_UNIX; + ChannelTCP * c = create_channel(req->u.acc.rval, ssl, 1, unix_domain); + if (c == NULL) { + trace(LOG_ALWAYS, "Cannot create channel for accepted connection: %s", errno_to_str(errno)); + closesocket(req->u.acc.rval); + } + else { + set_peer_addr(c, si->addr_buf, si->addr_len); + si->serv.new_conn(&si->serv, &c->chan); + } } si->accreq.u.acc.addrlen = si->addr_len; async_req_post(req); diff --git a/framework/outputbuf.c b/framework/outputbuf.c new file mode 100644 index 00000000..a4ca8672 --- /dev/null +++ b/framework/outputbuf.c @@ -0,0 +1,116 @@ +/******************************************************************************* + * Copyright (q) 2007, 2010 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ + +/* + * Uttility module that implements an abstarct output queue. + */ + +#include <config.h> +#include <assert.h> +#include <framework/outputbuf.h> +#include <framework/myalloc.h> +#include <framework/trace.h> +#include <framework/errors.h> + +#define link2buf(A) ((OutputBuffer *)((char *)(A) - offsetof(OutputBuffer, link))) + +void output_queue_ini(OutputQueue * q) { + list_init(&q->queue); + list_init(&q->pool); +} + +void output_queue_add(OutputQueue * q, const void * buf, size_t size) { + if (q->error) return; + if (q->queue.next != q->queue.prev) { + /* Append data to the last pending buffer */ + size_t gap = 0; + OutputBuffer * bf = link2buf(q->queue.prev); + assert(bf->buf_pos == 0); + gap = sizeof(bf->buf) - bf->buf_len; + if (gap > 0) { + size_t len = size; + if (len > gap) len = gap; + memcpy(bf->buf + bf->buf_len, buf, len); + bf->buf_len += len; + buf = (const char *)buf + len; + size -= len; + } + } + while (size > 0) { + size_t len = size; + OutputBuffer * bf = NULL; + if (list_is_empty(&q->pool)) { + bf = (OutputBuffer *)loc_alloc_zero(sizeof(OutputBuffer)); + bf->queue = q; + } + else { + bf = link2buf(q->pool.next); + list_remove(&bf->link); + q->pool_size--; + } + if (len > sizeof(bf->buf)) len = sizeof(bf->buf); + bf->buf_pos = 0; + bf->buf_len = len; + memcpy(bf->buf, buf, len); + list_add_last(&bf->link, &q->queue); + if (q->queue.next == &bf->link) { + q->post_io_request(bf); + } + buf = (const char *)buf + len; + size -= len; + } +} + +void output_queue_done(OutputQueue * q, int error, int size) { + OutputBuffer * bf = link2buf(q->queue.next); + + assert(q->error == 0); + if (error) { + q->error = error; + trace(LOG_PROTOCOL, "Can't write() on output queue %#lx: %s", q, errno_to_str(q->error)); + output_queue_clear(q); + } + else { + bf->buf_pos += size; + if (bf->buf_pos < bf->buf_len) { + /* Nothing */ + } + else if (q->pool_size < 8) { + list_remove(&bf->link); + list_add_last(&bf->link, &q->pool); + q->pool_size++; + } + else { + list_remove(&bf->link); + loc_free(bf); + } + } + if (!list_is_empty(&q->queue)) { + bf = link2buf(q->queue.next); + q->post_io_request(bf); + } +} + +void output_queue_clear(OutputQueue * q) { + while (!list_is_empty(&q->queue)) { + OutputBuffer * bf = link2buf(q->queue.next); + list_remove(&bf->link); + loc_free(bf); + } + while (!list_is_empty(&q->pool)) { + OutputBuffer * bf = link2buf(q->pool.next); + list_remove(&bf->link); + loc_free(bf); + } +} diff --git a/framework/outputbuf.h b/framework/outputbuf.h new file mode 100644 index 00000000..a5f56bde --- /dev/null +++ b/framework/outputbuf.h @@ -0,0 +1,51 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ + +/* + * Uttility module that implements an abstarct output queue. + */ + +#ifndef D_outputbuf +#define D_outputbuf + +#include <config.h> +#include <framework/link.h> + +typedef struct OutputQueue OutputQueue; +typedef struct OutputBuffer OutputBuffer; + +struct OutputQueue { + int error; + LINK queue; + LINK pool; + int pool_size; + void (*post_io_request)(OutputBuffer *); +}; + +struct OutputBuffer { + LINK link; + OutputQueue * queue; + char buf[128 * MEM_USAGE_FACTOR]; + size_t buf_len; + size_t buf_pos; +}; + +#define output_queue_is_empty(q) (list_is_empty(&(q)->queue)) + +extern void output_queue_ini(OutputQueue * q); +extern void output_queue_add(OutputQueue * q, const void * buf, size_t size); +extern void output_queue_done(OutputQueue * q, int error, int size); +extern void output_queue_clear(OutputQueue * q); + +#endif /* D_outputbuf */ diff --git a/server/server.vcproj b/server/server.vcproj index b4661bea..04943e17 100644 --- a/server/server.vcproj +++ b/server/server.vcproj @@ -467,6 +467,14 @@ >
</File>
<File
+ RelativePath="..\agent\framework\outputbuf.c"
+ >
+ </File>
+ <File
+ RelativePath="..\agent\framework\outputbuf.h"
+ >
+ </File>
+ <File
RelativePath="..\agent\framework\peer.c"
>
</File>
|