diff options
author | eutarass | 2009-07-07 04:36:56 +0000 |
---|---|---|
committer | eutarass | 2009-07-07 04:36:56 +0000 |
commit | 8f7e15604445dd62d426556bd1a648c84f4eda1c (patch) | |
tree | 0c048923311eedf287ac7b5d4ab9a82de124f7ab | |
parent | db66c239997c0a03e1c3ce77750cb4d02f3a48c7 (diff) | |
download | org.eclipse.tcf.agent-8f7e15604445dd62d426556bd1a648c84f4eda1c.tar.gz org.eclipse.tcf.agent-8f7e15604445dd62d426556bd1a648c84f4eda1c.tar.xz org.eclipse.tcf.agent-8f7e15604445dd62d426556bd1a648c84f4eda1c.zip |
Processes service: added support for process std input streams
-rw-r--r-- | processes.c | 85 |
1 files changed, 82 insertions, 3 deletions
diff --git a/processes.c b/processes.c index 032ffb72..2f9e7c56 100644 --- a/processes.c +++ b/processes.c @@ -86,6 +86,7 @@ typedef struct ChildProcess { int inp; int out; int err; + struct ProcessInput * inp_struct; char inp_id[256]; char out_id[256]; char err_id[256]; @@ -102,6 +103,17 @@ typedef struct ProcessOutput { VirtualStream * vstream; } ProcessOutput; +typedef struct ProcessInput { + ChildProcess * prs; + AsyncReqInfo req; + int req_posted; + char buf[PIPE_SIZE]; + size_t buf_pos; + size_t buf_len; + int eos; + VirtualStream * vstream; +} ProcessInput; + #define link2prs(A) ((ChildProcess *)((char *)(A) - offsetof(ChildProcess, link))) static LINK prs_list; @@ -653,13 +665,78 @@ static void process_exited(ChildProcess * prs) { close(prs->inp); close(prs->out); if (prs->out != prs->err) close(prs->err); + if (prs->inp_struct) { + ProcessInput * inp = prs->inp_struct; + if (!inp->req_posted) { + virtual_stream_delete(inp->vstream); + loc_free(inp); + } + else { + inp->prs = NULL; + } + } loc_free(prs); #if defined(_WRS_KERNEL) semGive(prs_list_lock); #endif } -static void streams_callback(VirtualStream * stream, int event_code, void * args) { +static void process_input_streams_callback(VirtualStream * stream, int event_code, void * args) { + ProcessInput * inp = (ProcessInput *)args; + + assert(inp->vstream == stream); + if (!inp->req_posted) { + if (inp->buf_pos >= inp->buf_len && !inp->eos) { + inp->buf_pos = inp->buf_len = 0; + virtual_stream_get_data(stream, inp->buf, sizeof(inp->buf), &inp->buf_len, &inp->eos); + } + if (inp->buf_pos < inp->buf_len) { + inp->req.u.fio.bufp = inp->buf + inp->buf_pos; + inp->req.u.fio.bufsz = inp->buf_len - inp->buf_pos; + inp->req_posted = 1; + async_req_post(&inp->req); + } + } +} + +static void write_process_input_done(void * x) { + AsyncReqInfo * req = x; + ProcessInput * inp = (ProcessInput *)req->client_data; + + inp->req_posted = 0; + if (inp->prs == NULL) { + /* Process has exited */ + virtual_stream_delete(inp->vstream); + loc_free(inp); + } + else { + int wr = inp->req.u.fio.rval; + + if (wr < 0) { + int err = inp->req.error; + trace(LOG_ALWAYS, "Can't write process input stream: %d %s", err, errno_to_str(err)); + inp->buf_pos = inp->buf_len = 0; + } + else { + inp->buf_pos += wr; + } + + process_input_streams_callback(inp->vstream, 0, inp); + } +} + +static void write_process_input(ChildProcess * prs) { + ProcessInput * inp = prs->inp_struct = loc_alloc_zero(sizeof(ProcessInput)); + inp->prs = prs; + inp->req.client_data = inp; + inp->req.done = write_process_input_done; + inp->req.type = AsyncReqWrite; + inp->req.u.fio.fd = prs->inp; + virtual_stream_create(PROCESSES, 0x1000, VS_ENABLE_REMOTE_WRITE, process_input_streams_callback, inp, &inp->vstream); + virtual_stream_get_id(inp->vstream, prs->inp_id, sizeof(prs->inp_id)); +} + +static void process_output_streams_callback(VirtualStream * stream, int event_code, void * args) { ProcessOutput * out = (ProcessOutput *)args; assert(out->vstream == stream); @@ -677,6 +754,7 @@ static void streams_callback(VirtualStream * stream, int event_code, void * args assert(buf_len <= sizeof(out->buf)); assert(out->buf_pos <= (size_t)buf_len); assert(out->req.u.fio.bufp == out->buf); + if (err) trace(LOG_ALWAYS, "Can't read process output stream: %d %s", err, errno_to_str(err)); if (out->buf_pos < (size_t)buf_len || out->eos != eos) { unsigned done = 0; @@ -707,7 +785,7 @@ static void read_process_output_done(void * x) { out->buf_pos = 0; out->req_posted = 0; - streams_callback(out->vstream, 0, out); + process_output_streams_callback(out->vstream, 0, out); } static void read_process_output(ChildProcess * prs, int fd, char * id, size_t id_size) { @@ -719,7 +797,7 @@ static void read_process_output(ChildProcess * prs, int fd, char * id, size_t id out->req.u.fio.bufp = out->buf; out->req.u.fio.bufsz = sizeof(out->buf); out->req.u.fio.fd = fd; - virtual_stream_create(PROCESSES, 0x1000, VS_ENABLE_REMOTE_READ, streams_callback, out, &out->vstream); + virtual_stream_create(PROCESSES, 0x1000, VS_ENABLE_REMOTE_READ, process_output_streams_callback, out, &out->vstream); virtual_stream_get_id(out->vstream, id, id_size); out->req_posted = 1; async_req_post(&out->req); @@ -1134,6 +1212,7 @@ static void command_start(char * token, Channel * c) { if (dir[0] != 0 && chdir(dir) < 0) err = errno; if (err == 0 && start_process(c, envp, dir, exe, args, attach, &pid, &selfattach, &prs) < 0) err = errno; if (prs != NULL) { + write_process_input(prs); read_process_output(prs, prs->out, prs->out_id, sizeof(prs->out_id)); if (prs->out != prs->err) read_process_output(prs, prs->err, prs->err_id, sizeof(prs->err_id)); } |