Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreutarass2010-12-21 20:32:53 +0000
committereutarass2010-12-21 20:32:53 +0000
commit924a0a84057556938e1310084c99955e71a871d6 (patch)
tree847054f8e61180f8fb62021481f79fe681c02f42
parentc66af72a286cb4d793a395f9e9ffae31f056f81d (diff)
downloadorg.eclipse.tcf.agent-924a0a84057556938e1310084c99955e71a871d6.tar.gz
org.eclipse.tcf.agent-924a0a84057556938e1310084c99955e71a871d6.tar.xz
org.eclipse.tcf.agent-924a0a84057556938e1310084c99955e71a871d6.zip
TCF Agent: implemented optional output queue for TCF communication channel.
In addition to improving the agent performance, the queue solves the problem of SSL and PIPE channels being prone to deadlocks.
-rw-r--r--agent.vcproj8
-rw-r--r--framework/channel.h9
-rw-r--r--framework/channel_tcp.c285
-rw-r--r--framework/outputbuf.c116
-rw-r--r--framework/outputbuf.h51
-rw-r--r--server/server.vcproj8
6 files changed, 364 insertions, 113 deletions
diff --git a/agent.vcproj b/agent.vcproj
index 847144e7..6341a515 100644
--- a/agent.vcproj
+++ b/agent.vcproj
@@ -336,6 +336,14 @@
>
</File>
<File
+ RelativePath=".\framework\outputbuf.c"
+ >
+ </File>
+ <File
+ RelativePath=".\framework\outputbuf.h"
+ >
+ </File>
+ <File
RelativePath=".\framework\peer.c"
>
</File>
diff --git a/framework/channel.h b/framework/channel.h
index 95c5d55d..a81c927c 100644
--- a/framework/channel.h
+++ b/framework/channel.h
@@ -23,7 +23,11 @@
#include <framework/link.h>
#include <framework/peer.h>
+struct Protocol;
typedef struct TCFBroadcastGroup TCFBroadcastGroup;
+typedef struct ChannelServer ChannelServer;
+typedef struct Channel Channel;
+
struct TCFBroadcastGroup {
int magic;
OutputStream out; /* Broadcast stream */
@@ -41,9 +45,6 @@ enum {
ChannelStateDisconnected
};
-struct Protocol;
-typedef struct Channel Channel;
-
struct Channel {
InputStream inp; /* Input stream */
OutputStream out; /* Output stream */
@@ -75,8 +76,6 @@ struct Channel {
void (*disconnected)(Channel *); /* Called when channel is disconnected */
};
-typedef struct ChannelServer ChannelServer;
-
struct ChannelServer {
void * client_data; /* Client data */
void (*new_conn)(ChannelServer *, Channel *); /* New connection call back */
diff --git a/framework/channel_tcp.c b/framework/channel_tcp.c
index 7a9c881d..8ac25eb3 100644
--- a/framework/channel_tcp.c
+++ b/framework/channel_tcp.c
@@ -52,6 +52,7 @@
#include <framework/ip_ifc.h>
#include <framework/asyncreq.h>
#include <framework/inputbuf.h>
+#include <framework/outputbuf.h>
#include <services/discovery.h>
#ifndef MSG_MORE
@@ -67,6 +68,14 @@
#define CHANNEL_MAGIC 0x27208956
#define MAX_IFC 10
+#if !defined(ENABLE_OutputQueue)
+# if ENABLE_SSL || ENABLE_ContextProxy || defined(WIN32)
+# define ENABLE_OutputQueue 1
+# else
+# define ENABLE_OutputQueue 0
+# endif
+#endif
+
typedef struct ChannelTCP ChannelTCP;
struct ChannelTCP {
@@ -91,13 +100,17 @@ struct ChannelTCP {
InputBuf ibuf;
/* Output stream state */
+ unsigned char * out_bin_block;
unsigned char obuf[BUF_SIZE];
int out_errno;
int out_flush_cnt;
- unsigned char * out_bin_block;
+#if ENABLE_OutputQueue
+ OutputQueue out_queue;
+ AsyncReqInfo wr_req;
+#endif /* ENABLE_OutputQueue */
/* Async read request */
- AsyncReqInfo rdreq;
+ AsyncReqInfo rd_req;
};
typedef struct ServerTCP ServerTCP;
@@ -118,6 +131,7 @@ struct ServerTCP {
#define server2tcp(A) ((ServerTCP *)((char *)(A) - offsetof(ServerTCP, serv)))
#define servlink2tcp(A) ((ServerTCP *)((char *)(A) - offsetof(ServerTCP, servlink)))
#define ibuf2tcp(A) ((ChannelTCP *)((char *)(A) - offsetof(ChannelTCP, ibuf)))
+#define obuf2tcp(A) ((ChannelTCP *)((char *)(A) - offsetof(ChannelTCP, out_queue)))
static LINK server_list;
static void tcp_channel_read_done(void * x);
@@ -187,6 +201,9 @@ static void delete_channel(ChannelTCP * c) {
assert(c->ibuf.handling_msg != HandleMsgTriggered);
channel_clear_broadcast_group(&c->chan);
c->magic = 0;
+#if ENABLE_OutputQueue
+ output_queue_clear(&c->out_queue);
+#endif /* ENABLE_OutputQueue */
#if ENABLE_SSL
if (c->ssl) SSL_free(c->ssl);
#endif /* ENABLE_SSL */
@@ -227,63 +244,104 @@ static int tcp_is_closed(Channel * channel) {
return c->chan.state == ChannelStateDisconnected;
}
+#if ENABLE_OutputQueue
+static void done_write_request(void * args) {
+ ChannelTCP * c = (ChannelTCP *)((AsyncReqInfo *)args)->client_data;
+ int size = 0;
+ int error = 0;
+
+ assert(args == &c->wr_req);
+ if (c->wr_req.u.sio.rval < 0) error = c->wr_req.error;
+ else if (c->wr_req.type == AsyncReqSend) size = c->wr_req.u.sio.rval;
+ output_queue_done(&c->out_queue, error, size);
+ if (error) c->out_errno = error;
+ if (output_queue_is_empty(&c->out_queue) &&
+ c->chan.state == ChannelStateDisconnected) shutdown(c->socket, SHUT_WR);
+ tcp_unlock(&c->chan);
+}
+
+static void post_write_request(OutputBuffer * bf) {
+ ChannelTCP * c = obuf2tcp(bf->queue);
+ c->wr_req.client_data = c;
+ c->wr_req.done = done_write_request;
+#if ENABLE_SSL
+ if (c->ssl) {
+ int wr = SSL_write(c->ssl, bf->buf + bf->buf_pos, bf->buf_len - bf->buf_pos);
+ if (wr <= 0) {
+ int err = SSL_get_error(c->ssl, wr);
+ if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
+ c->wr_req.type = AsyncReqSelect;
+ c->wr_req.u.select.nfds = c->socket + 1;
+ FD_ZERO(&c->wr_req.u.select.readfds);
+ FD_ZERO(&c->wr_req.u.select.writefds);
+ FD_ZERO(&c->wr_req.u.select.errorfds);
+ if (err == SSL_ERROR_WANT_WRITE) FD_SET(c->socket, &c->wr_req.u.select.writefds);
+ if (err == SSL_ERROR_WANT_READ) FD_SET(c->socket, &c->wr_req.u.select.readfds);
+ FD_SET(c->socket, &c->wr_req.u.select.errorfds);
+ c->wr_req.u.select.timeout.tv_sec = 10;
+ async_req_post(&c->wr_req);
+ }
+ else {
+ int error = set_ssl_errno();
+ c->chan.out.cur = c->obuf;
+ trace(LOG_PROTOCOL, "Can't SSL_write() on channel %#lx: %s", c, errno_to_str(error));
+ c->wr_req.type = AsyncReqSend;
+ c->wr_req.error = error;
+ c->wr_req.u.sio.rval = -1;
+ post_event(done_write_request, &c->wr_req);
+ }
+ }
+ else {
+ c->wr_req.type = AsyncReqSend;
+ c->wr_req.error = 0;
+ c->wr_req.u.sio.rval = wr;
+ post_event(done_write_request, &c->wr_req);
+ }
+ }
+ else
+#endif
+ {
+ c->wr_req.type = AsyncReqSend;
+ c->wr_req.u.sio.sock = c->socket;
+ c->wr_req.u.sio.bufp = bf->buf + bf->buf_pos;
+ c->wr_req.u.sio.bufsz = bf->buf_len - bf->buf_pos;
+ c->wr_req.u.sio.flags = c->out_queue.queue.next == c->out_queue.queue.prev ? 0 : MSG_MORE;
+ async_req_post(&c->wr_req);
+ }
+ tcp_lock(&c->chan);
+}
+#endif /* ENABLE_OutputQueue */
+
static void tcp_flush_with_flags(ChannelTCP * c, int flags) {
unsigned char * p = c->obuf;
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->chan.out.end == p + sizeof(c->obuf));
assert(c->out_bin_block == NULL);
+ assert(c->chan.out.cur >= p);
+ assert(c->chan.out.cur <= p + sizeof(c->obuf));
if (c->chan.out.cur == p) return;
- assert(c->chan.out.cur >= p && c->chan.out.cur <= p + sizeof(c->obuf));
- if (c->chan.state == ChannelStateDisconnected || c->out_errno) {
- c->chan.out.cur = p;
- return;
- }
- while (p < c->chan.out.cur) {
- int wr = 0;
- if (c->ssl) {
-#if ENABLE_SSL
- wr = SSL_write(c->ssl, p, c->chan.out.cur - p);
- if (wr <= 0) {
- int err = SSL_get_error(c->ssl, wr);
- if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
- struct timeval tv;
- fd_set readfds;
- fd_set writefds;
- fd_set errorfds;
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
- FD_ZERO(&errorfds);
- if (err == SSL_ERROR_WANT_READ) FD_SET(c->socket, &readfds);
- if (err == SSL_ERROR_WANT_WRITE) FD_SET(c->socket, &writefds);
- FD_SET(c->socket, &errorfds);
- tv.tv_sec = 10L;
- tv.tv_usec = 0;
- if (select(c->socket + 1, &readfds, &writefds, &errorfds, &tv) >= 0) continue;
- }
- trace(LOG_PROTOCOL, "Can't SSL_write() on channel %#lx: %s", c,
- ERR_error_string(ERR_get_error(), NULL));
- c->out_errno = EIO;
- c->chan.out.cur = c->obuf;
- return;
- }
+ if (c->chan.state != ChannelStateDisconnected && c->out_errno == 0) {
+#if ENABLE_OutputQueue
+ c->out_queue.post_io_request = post_write_request;
+ output_queue_add(&c->out_queue, p, c->chan.out.cur - p);
#else
- assert(0);
-#endif
- }
- else {
- wr = send(c->socket, p, c->chan.out.cur - p, flags);
+ assert(c->ssl == NULL);
+ while (p < c->chan.out.cur) {
+ int sz = c->chan.out.cur - p;
+ int wr = send(c->socket, p, sz, flags);
if (wr < 0) {
int err = errno;
- trace(LOG_PROTOCOL, "Can't send() on channel %#lx: %d %s", c, err, errno_to_str(err));
+ trace(LOG_PROTOCOL, "Can't send() on channel %#lx: %s", c, errno_to_str(err));
c->out_errno = err;
c->chan.out.cur = c->obuf;
return;
}
+ p += wr;
}
- p += wr;
+ assert(p == c->chan.out.cur);
+#endif
}
- assert(p == c->chan.out.cur);
c->chan.out.cur = c->obuf;
}
@@ -327,8 +385,6 @@ static void tcp_write_stream(OutputStream * out, int byte) {
assert(c->magic == CHANNEL_MAGIC);
if (!c->chan.out.supports_zero_copy || c->chan.out.cur >= c->chan.out.end - 32 || byte < 0) {
if (c->out_bin_block != NULL) tcp_bin_block_end(c);
- if (c->out_errno) return;
- if (c->chan.state == ChannelStateDisconnected) return;
if (c->chan.out.cur == c->chan.out.end) tcp_flush_with_flags(c, MSG_MORE);
if (byte < 0 || byte == ESC) {
char esc = 0;
@@ -337,8 +393,6 @@ static void tcp_write_stream(OutputStream * out, int byte) {
else if (byte == MARKER_EOM) esc = 1;
else if (byte == MARKER_EOS) esc = 2;
else assert(0);
- if (c->out_errno) return;
- if (c->chan.state == ChannelStateDisconnected) return;
if (c->chan.out.cur == c->chan.out.end) tcp_flush_with_flags(c, MSG_MORE);
*c->chan.out.cur++ = esc;
if (byte == MARKER_EOM && c->out_flush_cnt < 4) {
@@ -385,6 +439,13 @@ static int tcp_splice_block_stream(OutputStream * out, int fd, size_t size, off_
/* We need to flush the buffer then send our data */
tcp_flush_with_flags(c, MSG_MORE);
+#if ENABLE_OutputQueue
+ while (!output_queue_is_empty(&c->out_queue)) {
+ cancel_event(done_write_request, &c->wr_req, 1);
+ done_write_request(&c->wr_req);
+ }
+#endif
+
if (c->chan.state == ChannelStateDisconnected) return rd;
if (c->out_errno) return rd;
@@ -394,7 +455,7 @@ static int tcp_splice_block_stream(OutputStream * out, int fd, size_t size, off_
if (wr < 0) {
c->out_errno = errno;
- trace(LOG_PROTOCOL, "Error in socket splice: %d %s", errno, errno_to_str(errno));
+ trace(LOG_PROTOCOL, "Error in socket splice: %s", errno_to_str(errno));
break;
}
n -= wr;
@@ -430,26 +491,39 @@ static void tcp_post_read(InputBuf * ibuf, unsigned char * buf, int size) {
if (c->ssl) {
#if ENABLE_SSL
c->read_done = SSL_read(c->ssl, c->read_buf, c->read_buf_size);
- if (c->read_done > 0) {
- post_event(c->rdreq.done, &c->rdreq);
- return;
+ if (c->read_done <= 0) {
+ int err = SSL_get_error(c->ssl, c->read_done);
+ if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
+ FD_ZERO(&c->rd_req.u.select.readfds);
+ FD_ZERO(&c->rd_req.u.select.writefds);
+ FD_ZERO(&c->rd_req.u.select.errorfds);
+ if (err == SSL_ERROR_WANT_WRITE) FD_SET(c->socket, &c->rd_req.u.select.writefds);
+ if (err == SSL_ERROR_WANT_READ) FD_SET(c->socket, &c->rd_req.u.select.readfds);
+ FD_SET(c->socket, &c->rd_req.u.select.errorfds);
+ c->rd_req.u.select.timeout.tv_sec = 10;
+ c->read_done = -1;
+ async_req_post(&c->rd_req);
+ }
+ else {
+ if (c->chan.state != ChannelStateDisconnected) {
+ trace(LOG_ALWAYS, "Can't SSL_read() on channel %#lx: %s", c, errno_to_str(set_ssl_errno()));
+ }
+ c->read_done = 0;
+ post_event(c->rd_req.done, &c->rd_req);
+ }
+ }
+ else {
+ post_event(c->rd_req.done, &c->rd_req);
}
- FD_ZERO(&c->rdreq.u.select.readfds);
- FD_ZERO(&c->rdreq.u.select.writefds);
- FD_ZERO(&c->rdreq.u.select.errorfds);
- FD_SET(c->socket, &c->rdreq.u.select.readfds);
- FD_SET(c->socket, &c->rdreq.u.select.errorfds);
- c->rdreq.u.select.timeout.tv_sec = 10;
#else
assert(0);
#endif
}
else {
- c->read_done = 0;
- c->rdreq.u.sio.bufp = buf;
- c->rdreq.u.sio.bufsz = size;
+ c->rd_req.u.sio.bufp = buf;
+ c->rd_req.u.sio.bufsz = size;
+ async_req_post(&c->rd_req);
}
- async_req_post(&c->rdreq);
}
static void tcp_wait_read(InputBuf * ibuf) {
@@ -458,8 +532,8 @@ static void tcp_wait_read(InputBuf * ibuf) {
/* Wait for read to complete */
assert(c->lock_cnt > 0);
assert(c->read_pending != 0);
- cancel_event(tcp_channel_read_done, &c->rdreq, 1);
- tcp_channel_read_done(&c->rdreq);
+ cancel_event(tcp_channel_read_done, &c->rd_req, 1);
+ tcp_channel_read_done(&c->rd_req);
}
static int tcp_read_stream(InputStream * inp) {
@@ -495,6 +569,9 @@ static void send_eof_and_close(Channel * channel, int err) {
write_errno(&c->chan.out, err);
write_stream(&c->chan.out, MARKER_EOM);
tcp_flush_with_flags(c, 0);
+#if ENABLE_OutputQueue
+ if (output_queue_is_empty(&c->out_queue))
+#endif
shutdown(c->socket, SHUT_WR);
c->chan.state = ChannelStateDisconnected;
tcp_post_read(&c->ibuf, c->obuf, sizeof(c->obuf));
@@ -537,8 +614,7 @@ static void handle_channel_msg(void * x) {
clear_trap(&trap);
}
else {
- trace(LOG_ALWAYS, "Exception in message handler: %d %s",
- trap.error, errno_to_str(trap.error));
+ trace(LOG_ALWAYS, "Exception in message handler: %s", errno_to_str(trap.error));
send_eof_and_close(&c->chan, trap.error);
}
}
@@ -583,32 +659,19 @@ static void tcp_channel_read_done(void * x) {
c->read_pending = 0;
if (c->ssl) {
#if ENABLE_SSL
- if (c->read_done > 0) {
- len = c->read_done;
- }
- else {
- len = SSL_read(c->ssl, c->read_buf, c->read_buf_size);
- if (len <= 0) {
- int err = SSL_get_error(c->ssl, len);
- if (err == SSL_ERROR_WANT_READ) {
- tcp_post_read(&c->ibuf, c->read_buf, c->read_buf_size);
- return;
- }
- if (c->chan.state != ChannelStateDisconnected) {
- trace(LOG_ALWAYS, "Can't SSL_read() on channel %#lx: %s", c,
- ERR_error_string(ERR_get_error(), NULL));
- }
- len = 0;
- }
+ if (c->read_done < 0) {
+ tcp_post_read(&c->ibuf, c->read_buf, c->read_buf_size);
+ return;
}
+ len = c->read_done;
#else
assert(0);
#endif
}
else {
- assert(c->read_buf == c->rdreq.u.sio.bufp);
- assert((size_t)c->read_buf_size == c->rdreq.u.sio.bufsz);
- len = c->rdreq.u.sio.rval;
+ assert(c->read_buf == c->rd_req.u.sio.bufp);
+ assert((size_t)c->read_buf_size == c->rd_req.u.sio.bufsz);
+ len = c->rd_req.u.sio.rval;
if (req->error) {
if (c->chan.state != ChannelStateDisconnected) {
trace(LOG_ALWAYS, "Can't read from socket: %s", errno_to_str(req->error));
@@ -655,14 +718,12 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i, sizeof(i)) < 0) {
int error = errno;
trace(LOG_ALWAYS, "Can't set TCP_NODELAY option on a socket: %s", errno_to_str(error));
- closesocket(sock);
errno = error;
return NULL;
}
if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, sizeof(i)) < 0) {
int error = errno;
trace(LOG_ALWAYS, "Can't set SO_KEEPALIVE option on a socket: %s", errno_to_str(error));
- closesocket(sock);
errno = error;
return NULL;
}
@@ -686,10 +747,12 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do
if (!err && (fp = fopen(fnm, "r")) == NULL) err = errno;
if (!err && (rsa_key = PEM_read_RSAPrivateKey(fp, NULL, NULL, NULL)) == NULL) err = set_ssl_errno();
if (!err && fclose(fp) != 0) err = errno;
- snprintf(fnm, sizeof(fnm), "%s/ssl/local.cert", tcf_dir);
- if (!err && (fp = fopen(fnm, "r")) == NULL) err = errno;
- if (!err && (ssl_cert = PEM_read_X509(fp, NULL, NULL, NULL)) == NULL) err = set_ssl_errno();
- if (!err && fclose(fp) != 0) err = errno;
+ if (!err) {
+ snprintf(fnm, sizeof(fnm), "%s/ssl/local.cert", tcf_dir);
+ if (!err && (fp = fopen(fnm, "r")) == NULL) err = errno;
+ if (!err && (ssl_cert = PEM_read_X509(fp, NULL, NULL, NULL)) == NULL) err = set_ssl_errno();
+ if (!err && fclose(fp) != 0) err = errno;
+ }
if (err) {
trace(LOG_ALWAYS, "Cannot read SSL certificate %s: %s", fnm, errno_to_str(err));
set_errno(err, "Cannot read SSL certificate");
@@ -743,21 +806,24 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do
c->ibuf.trigger_message = tcp_trigger_message;
c->socket = sock;
c->lock_cnt = 1;
- c->rdreq.done = tcp_channel_read_done;
- c->rdreq.client_data = c;
+ c->rd_req.done = tcp_channel_read_done;
+ c->rd_req.client_data = c;
if (c->ssl) {
#if ENABLE_SSL
- c->rdreq.type = AsyncReqSelect;
- c->rdreq.u.select.nfds = c->socket + 1;
+ c->rd_req.type = AsyncReqSelect;
+ c->rd_req.u.select.nfds = c->socket + 1;
#else
assert(0);
#endif
}
else {
- c->rdreq.type = AsyncReqRecv;
- c->rdreq.u.sio.sock = c->socket;
- c->rdreq.u.sio.flags = 0;
+ c->rd_req.type = AsyncReqRecv;
+ c->rd_req.u.sio.sock = c->socket;
+ c->rd_req.u.sio.flags = 0;
}
+#if ENABLE_OutputQueue
+ output_queue_ini(&c->out_queue);
+#endif
return c;
}
@@ -860,7 +926,6 @@ static void set_peer_addr(ChannelTCP * c, struct sockaddr * addr, int addr_len)
static void tcp_server_accept_done(void * x) {
AsyncReqInfo * req = (AsyncReqInfo *)x;
ServerTCP * si = (ServerTCP *)req->client_data;
- ChannelTCP * c = NULL;
if (si->sock < 0) {
/* Server closed. */
@@ -868,16 +933,20 @@ static void tcp_server_accept_done(void * x) {
return;
}
if (req->error) {
- trace(LOG_ALWAYS, "socket accept failed: %d %s", req->error, errno_to_str(req->error));
- si->accreq.u.acc.addrlen = si->addr_len;
- async_req_post(req);
- return;
+ trace(LOG_ALWAYS, "Socket accept failed: %s", errno_to_str(req->error));
}
- c = create_channel(req->u.acc.rval, strcmp(peer_server_getprop(si->ps, "TransportName", ""), "SSL") == 0, 1,
- si->addr_buf->sa_family == AF_UNIX);
- if (c != NULL) {
- set_peer_addr(c, si->addr_buf, si->addr_len);
- si->serv.new_conn(&si->serv, &c->chan);
+ else {
+ int ssl = strcmp(peer_server_getprop(si->ps, "TransportName", ""), "SSL") == 0;
+ int unix_domain = si->addr_buf->sa_family == AF_UNIX;
+ ChannelTCP * c = create_channel(req->u.acc.rval, ssl, 1, unix_domain);
+ if (c == NULL) {
+ trace(LOG_ALWAYS, "Cannot create channel for accepted connection: %s", errno_to_str(errno));
+ closesocket(req->u.acc.rval);
+ }
+ else {
+ set_peer_addr(c, si->addr_buf, si->addr_len);
+ si->serv.new_conn(&si->serv, &c->chan);
+ }
}
si->accreq.u.acc.addrlen = si->addr_len;
async_req_post(req);
diff --git a/framework/outputbuf.c b/framework/outputbuf.c
new file mode 100644
index 00000000..a4ca8672
--- /dev/null
+++ b/framework/outputbuf.c
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Copyright (q) 2007, 2010 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * Uttility module that implements an abstarct output queue.
+ */
+
+#include <config.h>
+#include <assert.h>
+#include <framework/outputbuf.h>
+#include <framework/myalloc.h>
+#include <framework/trace.h>
+#include <framework/errors.h>
+
+#define link2buf(A) ((OutputBuffer *)((char *)(A) - offsetof(OutputBuffer, link)))
+
+void output_queue_ini(OutputQueue * q) {
+ list_init(&q->queue);
+ list_init(&q->pool);
+}
+
+void output_queue_add(OutputQueue * q, const void * buf, size_t size) {
+ if (q->error) return;
+ if (q->queue.next != q->queue.prev) {
+ /* Append data to the last pending buffer */
+ size_t gap = 0;
+ OutputBuffer * bf = link2buf(q->queue.prev);
+ assert(bf->buf_pos == 0);
+ gap = sizeof(bf->buf) - bf->buf_len;
+ if (gap > 0) {
+ size_t len = size;
+ if (len > gap) len = gap;
+ memcpy(bf->buf + bf->buf_len, buf, len);
+ bf->buf_len += len;
+ buf = (const char *)buf + len;
+ size -= len;
+ }
+ }
+ while (size > 0) {
+ size_t len = size;
+ OutputBuffer * bf = NULL;
+ if (list_is_empty(&q->pool)) {
+ bf = (OutputBuffer *)loc_alloc_zero(sizeof(OutputBuffer));
+ bf->queue = q;
+ }
+ else {
+ bf = link2buf(q->pool.next);
+ list_remove(&bf->link);
+ q->pool_size--;
+ }
+ if (len > sizeof(bf->buf)) len = sizeof(bf->buf);
+ bf->buf_pos = 0;
+ bf->buf_len = len;
+ memcpy(bf->buf, buf, len);
+ list_add_last(&bf->link, &q->queue);
+ if (q->queue.next == &bf->link) {
+ q->post_io_request(bf);
+ }
+ buf = (const char *)buf + len;
+ size -= len;
+ }
+}
+
+void output_queue_done(OutputQueue * q, int error, int size) {
+ OutputBuffer * bf = link2buf(q->queue.next);
+
+ assert(q->error == 0);
+ if (error) {
+ q->error = error;
+ trace(LOG_PROTOCOL, "Can't write() on output queue %#lx: %s", q, errno_to_str(q->error));
+ output_queue_clear(q);
+ }
+ else {
+ bf->buf_pos += size;
+ if (bf->buf_pos < bf->buf_len) {
+ /* Nothing */
+ }
+ else if (q->pool_size < 8) {
+ list_remove(&bf->link);
+ list_add_last(&bf->link, &q->pool);
+ q->pool_size++;
+ }
+ else {
+ list_remove(&bf->link);
+ loc_free(bf);
+ }
+ }
+ if (!list_is_empty(&q->queue)) {
+ bf = link2buf(q->queue.next);
+ q->post_io_request(bf);
+ }
+}
+
+void output_queue_clear(OutputQueue * q) {
+ while (!list_is_empty(&q->queue)) {
+ OutputBuffer * bf = link2buf(q->queue.next);
+ list_remove(&bf->link);
+ loc_free(bf);
+ }
+ while (!list_is_empty(&q->pool)) {
+ OutputBuffer * bf = link2buf(q->pool.next);
+ list_remove(&bf->link);
+ loc_free(bf);
+ }
+}
diff --git a/framework/outputbuf.h b/framework/outputbuf.h
new file mode 100644
index 00000000..a5f56bde
--- /dev/null
+++ b/framework/outputbuf.h
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * Uttility module that implements an abstarct output queue.
+ */
+
+#ifndef D_outputbuf
+#define D_outputbuf
+
+#include <config.h>
+#include <framework/link.h>
+
+typedef struct OutputQueue OutputQueue;
+typedef struct OutputBuffer OutputBuffer;
+
+struct OutputQueue {
+ int error;
+ LINK queue;
+ LINK pool;
+ int pool_size;
+ void (*post_io_request)(OutputBuffer *);
+};
+
+struct OutputBuffer {
+ LINK link;
+ OutputQueue * queue;
+ char buf[128 * MEM_USAGE_FACTOR];
+ size_t buf_len;
+ size_t buf_pos;
+};
+
+#define output_queue_is_empty(q) (list_is_empty(&(q)->queue))
+
+extern void output_queue_ini(OutputQueue * q);
+extern void output_queue_add(OutputQueue * q, const void * buf, size_t size);
+extern void output_queue_done(OutputQueue * q, int error, int size);
+extern void output_queue_clear(OutputQueue * q);
+
+#endif /* D_outputbuf */
diff --git a/server/server.vcproj b/server/server.vcproj
index b4661bea..04943e17 100644
--- a/server/server.vcproj
+++ b/server/server.vcproj
@@ -467,6 +467,14 @@
>
</File>
<File
+ RelativePath="..\agent\framework\outputbuf.c"
+ >
+ </File>
+ <File
+ RelativePath="..\agent\framework\outputbuf.h"
+ >
+ </File>
+ <File
RelativePath="..\agent\framework\peer.c"
>
</File>

Back to the top