Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreutarass2009-07-07 04:36:56 +0000
committereutarass2009-07-07 04:36:56 +0000
commit8f7e15604445dd62d426556bd1a648c84f4eda1c (patch)
tree0c048923311eedf287ac7b5d4ab9a82de124f7ab
parentdb66c239997c0a03e1c3ce77750cb4d02f3a48c7 (diff)
downloadorg.eclipse.tcf.agent-8f7e15604445dd62d426556bd1a648c84f4eda1c.tar.gz
org.eclipse.tcf.agent-8f7e15604445dd62d426556bd1a648c84f4eda1c.tar.xz
org.eclipse.tcf.agent-8f7e15604445dd62d426556bd1a648c84f4eda1c.zip
Processes service: added support for process std input streams
-rw-r--r--processes.c85
1 files changed, 82 insertions, 3 deletions
diff --git a/processes.c b/processes.c
index 032ffb72..2f9e7c56 100644
--- a/processes.c
+++ b/processes.c
@@ -86,6 +86,7 @@ typedef struct ChildProcess {
int inp;
int out;
int err;
+ struct ProcessInput * inp_struct;
char inp_id[256];
char out_id[256];
char err_id[256];
@@ -102,6 +103,17 @@ typedef struct ProcessOutput {
VirtualStream * vstream;
} ProcessOutput;
+typedef struct ProcessInput {
+ ChildProcess * prs;
+ AsyncReqInfo req;
+ int req_posted;
+ char buf[PIPE_SIZE];
+ size_t buf_pos;
+ size_t buf_len;
+ int eos;
+ VirtualStream * vstream;
+} ProcessInput;
+
#define link2prs(A) ((ChildProcess *)((char *)(A) - offsetof(ChildProcess, link)))
static LINK prs_list;
@@ -653,13 +665,78 @@ static void process_exited(ChildProcess * prs) {
close(prs->inp);
close(prs->out);
if (prs->out != prs->err) close(prs->err);
+ if (prs->inp_struct) {
+ ProcessInput * inp = prs->inp_struct;
+ if (!inp->req_posted) {
+ virtual_stream_delete(inp->vstream);
+ loc_free(inp);
+ }
+ else {
+ inp->prs = NULL;
+ }
+ }
loc_free(prs);
#if defined(_WRS_KERNEL)
semGive(prs_list_lock);
#endif
}
-static void streams_callback(VirtualStream * stream, int event_code, void * args) {
+static void process_input_streams_callback(VirtualStream * stream, int event_code, void * args) {
+ ProcessInput * inp = (ProcessInput *)args;
+
+ assert(inp->vstream == stream);
+ if (!inp->req_posted) {
+ if (inp->buf_pos >= inp->buf_len && !inp->eos) {
+ inp->buf_pos = inp->buf_len = 0;
+ virtual_stream_get_data(stream, inp->buf, sizeof(inp->buf), &inp->buf_len, &inp->eos);
+ }
+ if (inp->buf_pos < inp->buf_len) {
+ inp->req.u.fio.bufp = inp->buf + inp->buf_pos;
+ inp->req.u.fio.bufsz = inp->buf_len - inp->buf_pos;
+ inp->req_posted = 1;
+ async_req_post(&inp->req);
+ }
+ }
+}
+
+static void write_process_input_done(void * x) {
+ AsyncReqInfo * req = x;
+ ProcessInput * inp = (ProcessInput *)req->client_data;
+
+ inp->req_posted = 0;
+ if (inp->prs == NULL) {
+ /* Process has exited */
+ virtual_stream_delete(inp->vstream);
+ loc_free(inp);
+ }
+ else {
+ int wr = inp->req.u.fio.rval;
+
+ if (wr < 0) {
+ int err = inp->req.error;
+ trace(LOG_ALWAYS, "Can't write process input stream: %d %s", err, errno_to_str(err));
+ inp->buf_pos = inp->buf_len = 0;
+ }
+ else {
+ inp->buf_pos += wr;
+ }
+
+ process_input_streams_callback(inp->vstream, 0, inp);
+ }
+}
+
+static void write_process_input(ChildProcess * prs) {
+ ProcessInput * inp = prs->inp_struct = loc_alloc_zero(sizeof(ProcessInput));
+ inp->prs = prs;
+ inp->req.client_data = inp;
+ inp->req.done = write_process_input_done;
+ inp->req.type = AsyncReqWrite;
+ inp->req.u.fio.fd = prs->inp;
+ virtual_stream_create(PROCESSES, 0x1000, VS_ENABLE_REMOTE_WRITE, process_input_streams_callback, inp, &inp->vstream);
+ virtual_stream_get_id(inp->vstream, prs->inp_id, sizeof(prs->inp_id));
+}
+
+static void process_output_streams_callback(VirtualStream * stream, int event_code, void * args) {
ProcessOutput * out = (ProcessOutput *)args;
assert(out->vstream == stream);
@@ -677,6 +754,7 @@ static void streams_callback(VirtualStream * stream, int event_code, void * args
assert(buf_len <= sizeof(out->buf));
assert(out->buf_pos <= (size_t)buf_len);
assert(out->req.u.fio.bufp == out->buf);
+ if (err) trace(LOG_ALWAYS, "Can't read process output stream: %d %s", err, errno_to_str(err));
if (out->buf_pos < (size_t)buf_len || out->eos != eos) {
unsigned done = 0;
@@ -707,7 +785,7 @@ static void read_process_output_done(void * x) {
out->buf_pos = 0;
out->req_posted = 0;
- streams_callback(out->vstream, 0, out);
+ process_output_streams_callback(out->vstream, 0, out);
}
static void read_process_output(ChildProcess * prs, int fd, char * id, size_t id_size) {
@@ -719,7 +797,7 @@ static void read_process_output(ChildProcess * prs, int fd, char * id, size_t id
out->req.u.fio.bufp = out->buf;
out->req.u.fio.bufsz = sizeof(out->buf);
out->req.u.fio.fd = fd;
- virtual_stream_create(PROCESSES, 0x1000, VS_ENABLE_REMOTE_READ, streams_callback, out, &out->vstream);
+ virtual_stream_create(PROCESSES, 0x1000, VS_ENABLE_REMOTE_READ, process_output_streams_callback, out, &out->vstream);
virtual_stream_get_id(out->vstream, id, id_size);
out->req_posted = 1;
async_req_post(&out->req);
@@ -1134,6 +1212,7 @@ static void command_start(char * token, Channel * c) {
if (dir[0] != 0 && chdir(dir) < 0) err = errno;
if (err == 0 && start_process(c, envp, dir, exe, args, attach, &pid, &selfattach, &prs) < 0) err = errno;
if (prs != NULL) {
+ write_process_input(prs);
read_process_output(prs, prs->out, prs->out_id, sizeof(prs->out_id));
if (prs->out != prs->err) read_process_output(prs, prs->err, prs->err_id, sizeof(prs->err_id));
}

Back to the top