diff options
author | eutarass | 2009-05-07 21:36:12 +0000 |
---|---|---|
committer | eutarass | 2009-05-07 21:36:12 +0000 |
commit | 39b068b22fb18bd09a74c812f0257a86279f5163 (patch) | |
tree | d231e2ae4ffb38b6fbc714c18458595237e9abaa /filesystem.c | |
parent | 351860b9e70daa2066c2e73a59c7c15d117e7716 (diff) | |
download | org.eclipse.tcf.agent-39b068b22fb18bd09a74c812f0257a86279f5163.tar.gz org.eclipse.tcf.agent-39b068b22fb18bd09a74c812f0257a86279f5163.tar.xz org.eclipse.tcf.agent-39b068b22fb18bd09a74c812f0257a86279f5163.zip |
FileSystem service now uses asynchronous I/O: read/write commands for different files are executed in parallel.
Diffstat (limited to 'filesystem.c')
-rw-r--r-- | filesystem.c | 445 |
1 files changed, 308 insertions, 137 deletions
diff --git a/filesystem.c b/filesystem.c index 5dd3ee85..8f894472 100644 --- a/filesystem.c +++ b/filesystem.c @@ -32,6 +32,7 @@ # include <dirent.h> #endif #include "myalloc.h" +#include "asyncreq.h" #include "streams.h" #include "channel.h" #include "link.h" @@ -64,7 +65,25 @@ static const int FSERR_NO_SUCH_FILE = 0x10002, FSERR_PERMISSION_DENIED = 0x10003; +#define REQ_READ 1 +#define REQ_WRITE 2 +#define REQ_FSTAT 3 +#define REQ_FSETSTAT 4 +#define REQ_CLOSE 5 + typedef struct OpenFileInfo OpenFileInfo; +typedef struct IORequest IORequest; +typedef struct FileAttrs FileAttrs; + +struct FileAttrs { + int flags; + int64 size; + int uid; + int gid; + int permissions; + int64 atime; + int64 mtime; +}; struct OpenFileInfo { unsigned long handle; @@ -72,24 +91,26 @@ struct OpenFileInfo { int file; DIR * dir; InputStream * inp; + OutputStream * out; LINK link_ring; LINK link_hash; + LINK link_reqs; + IORequest * posted_req; +}; + +struct IORequest { + int req; + char token[256]; + OpenFileInfo * handle; + int64 offset; + FileAttrs attrs; + AsyncReqInfo info; + LINK link_reqs; }; #define hash2file(A) ((OpenFileInfo *)((char *)(A) - (int)&((OpenFileInfo *)0)->link_hash)) #define ring2file(A) ((OpenFileInfo *)((char *)(A) - (int)&((OpenFileInfo *)0)->link_ring)) - -typedef struct FileAttrs FileAttrs; - -struct FileAttrs { - int flags; - int64 size; - int uid; - int gid; - int permissions; - int64 atime; - int64 mtime; -}; +#define reqs2req(A) ((IORequest *)((char *)(A) - (int)&((IORequest *)0)->link_reqs)) static unsigned long handle_cnt = 0; @@ -101,7 +122,7 @@ static LINK file_info_ring = { NULL, NULL }; # define FS_ROOT "host:c:/" #endif -static OpenFileInfo * create_open_file_info(InputStream * inp, char * path, int file, DIR * dir) { +static OpenFileInfo * create_open_file_info(Channel * ch, char * path, int file, DIR * dir) { int i = 0; LINK * list_head = NULL; @@ -122,9 +143,11 @@ static OpenFileInfo * create_open_file_info(InputStream * inp, char * path, int strcpy(h->path, path); h->file = file; h->dir = dir; - h->inp = inp; + h->inp = &ch->inp; + h->out = &ch->out; list_add_first(&h->link_ring, &file_info_ring); list_add_first(&h->link_hash, list_head); + list_init(&h->link_reqs); return h; } @@ -144,6 +167,7 @@ static OpenFileInfo * find_open_file_info(char * id) { } static void delete_open_file_info(OpenFileInfo * h) { + assert(list_is_empty(&h->link_reqs)); list_remove(&h->link_ring); list_remove(&h->link_hash); loc_free(h); @@ -166,6 +190,18 @@ static void channel_close_listener(Channel * c) { if (h->file >= 0) { close(h->file); h->file = -1; + while (!list_is_empty(&h->link_reqs)) { + LINK * link = h->link_reqs.next; + IORequest * req = reqs2req(link); + list_remove(link); + if (h->posted_req == req) { + req->handle = NULL; + } + else { + loc_free(req->info.u.fio.bufp); + loc_free(req); + } + } } list_add_last(&h->link_hash, &list); } @@ -362,7 +398,7 @@ static void command_open(char * token, Channel * c) { err = errno; } else { - handle = create_open_file_info(&c->inp, path, file, NULL); + handle = create_open_file_info(c, path, file, NULL); } write_stringz(&c->out, "R"); @@ -372,6 +408,212 @@ static void command_open(char * token, Channel * c) { write_stream(&c->out, MARKER_EOM); } +static void reply_close(char * token, OutputStream * out, int err) { + write_stringz(out, "R"); + write_stringz(out, token); + write_fs_errno(out, err); + write_stream(out, MARKER_EOM); +} + +static void reply_read(char * token, OutputStream * out, int err, void * buf, unsigned len, int eof) { + write_stringz(out, "R"); + write_stringz(out, token); + if (buf != NULL) { + JsonWriteBinaryState state; + + json_write_binary_start(&state, out); + json_write_binary_data(&state, buf, len); + json_write_binary_end(&state); + write_stream(out, 0); + } + else { + write_stringz(out, "null"); + } + write_fs_errno(out, err); + json_write_boolean(out, eof); + write_stream(out, 0); + write_stream(out, MARKER_EOM); +} + +static void reply_write(char * token, OutputStream * out, int err) { + write_stringz(out, "R"); + write_stringz(out, token); + write_fs_errno(out, err); + write_stream(out, MARKER_EOM); +} + +static void reply_stat(char * token, OutputStream * out, int err, struct_stat * buf) { + FileAttrs attrs; + + if (err == 0) fill_attrs(&attrs, buf); + else memset(&attrs, 0, sizeof(attrs)); + + write_stringz(out, "R"); + write_stringz(out, token); + write_fs_errno(out, err); + write_file_attrs(out, &attrs); + write_stream(out, 0); + write_stream(out, MARKER_EOM); +} + +static void reply_setstat(char * token, OutputStream * out, int err) { + write_stringz(out, "R"); + write_stringz(out, token); + write_fs_errno(out, err); + write_stream(out, MARKER_EOM); +} + +static void post_io_requst(OpenFileInfo * handle); + +static void done_io_request(void * arg) { + int err = 0; + IORequest * req = (IORequest *)((AsyncReqInfo *)arg)->client_data; + OpenFileInfo * handle = req->handle; + + if (handle == NULL) { + loc_free(req->info.u.fio.bufp); + loc_free(req); + return; + } + + assert(handle->posted_req == req); + assert(&handle->posted_req->link_reqs == handle->link_reqs.next); + handle->posted_req = NULL; + list_remove(&req->link_reqs); + + switch (req->req) { + case REQ_READ: + if (req->info.error) { + reply_read(req->token, handle->out, req->info.error, NULL, 0, 0); + } + else { + reply_read(req->token, handle->out, 0, + req->info.u.fio.bufp, req->info.u.fio.rval, + (size_t)req->info.u.fio.rval < req->info.u.fio.bufsz); + } + break; + case REQ_WRITE: + if (req->info.error) err = req->info.error; + else if ((size_t)req->info.u.fio.rval < req->info.u.fio.bufsz) err = ENOSPC; + reply_write(req->token, handle->out, err); + break; + case REQ_CLOSE: + err = req->info.error; + reply_close(req->token, handle->out, err); + if (err == 0) { + loc_free(req); + while (!list_is_empty(&handle->link_reqs)) { + LINK * link = handle->link_reqs.next; + req = reqs2req(link); + switch (req->req) { + case REQ_READ: + reply_read(req->token, handle->out, EBADF, NULL, 0, 0); + break; + case REQ_WRITE: + reply_write(req->token, handle->out, EBADF); + break; + case REQ_FSTAT: + reply_stat(req->token, handle->out, EBADF, NULL); + break; + case REQ_FSETSTAT: + reply_setstat(req->token, handle->out, EBADF); + break; + case REQ_CLOSE: + reply_close(req->token, handle->out, EBADF); + break; + default: + assert(0); + } + list_remove(link); + loc_free(req->info.u.fio.bufp); + loc_free(req); + } + flush_stream(handle->out); + delete_open_file_info(handle); + return; + } + break; + default: + assert(0); + } + + loc_free(req->info.u.fio.bufp); + loc_free(req); + post_io_requst(handle); + flush_stream(handle->out); +} + +static void post_io_requst(OpenFileInfo * handle) { + while (handle->posted_req == NULL && !list_is_empty(&handle->link_reqs)) { + LINK * link = handle->link_reqs.next; + IORequest * req = reqs2req(link); + switch (req->req) { + case REQ_READ: + case REQ_WRITE: + if (lseek(handle->file, req->offset, SEEK_SET) == -1) { + assert(errno != 0); + req->info.error = errno; + handle->posted_req = req; + post_event(done_io_request, &req->info); + return; + } + break; + case REQ_FSTAT: + { + int err = 0; + struct_stat buf; + memset(&buf, 0, sizeof(buf)); + if (fstat(handle->file, &buf) < 0) err = errno; + reply_stat(req->token, handle->out, err, &buf); + list_remove(link); + } + continue; + case REQ_FSETSTAT: + { + int err = 0; + FileAttrs attrs = req->attrs; + if (attrs.flags & ATTR_SIZE) { + if (ftruncate(handle->file, attrs.size) < 0) err = errno; + } +#if defined(WIN32) || defined(_WRS_KERNEL) + if (attrs.flags & ATTR_PERMISSIONS) { + if (chmod(handle->path, attrs.permissions) < 0) err = errno; + } +#else + if (attrs.flags & ATTR_UIDGID) { + if (fchown(handle->file, attrs.uid, attrs.gid) < 0) err = errno; + } + if (attrs.flags & ATTR_PERMISSIONS) { + if (fchmod(handle->file, attrs.permissions) < 0) err = errno; + } +#endif + if (attrs.flags & ATTR_ACMODTIME) { + struct utimbuf buf; + buf.actime = (long)(attrs.atime / 1000); + buf.modtime = (long)(attrs.mtime / 1000); + if (utime(handle->path, &buf) < 0) err = errno; + } + reply_setstat(req->token, handle->out, err); + list_remove(link); + } + continue; + } + handle->posted_req = req; + async_req_post(&req->info); + } +} + +static IORequest * create_io_request(char * token, OpenFileInfo * handle, int type) { + IORequest * req = loc_alloc_zero(sizeof(IORequest)); + req->req = type; + req->handle = handle; + req->info.done = done_io_request; + req->info.client_data = req; + strncpy(req->token, token, sizeof(req->token) - 1); + list_add_last(&req->link_reqs, &handle->link_reqs); + return req; +} + static void command_close(char * token, Channel * c) { char id[256]; OpenFileInfo * h = NULL; @@ -394,28 +636,21 @@ static void command_close(char * token, Channel * c) { } } else { - if (close(h->file) < 0) { - err = errno; - } - else { - delete_open_file_info(h); - } + IORequest * req = create_io_request(token, h, REQ_CLOSE); + req->info.type = AsyncReqClose; + req->info.u.fio.fd = h->file; + post_io_requst(h); + return; } - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_fs_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); + reply_close(token, &c->out, err); } static void command_read(char * token, Channel * c) { char id[256]; OpenFileInfo * h = NULL; - int err = 0; - int eof = 0; int64 offset; unsigned long len; - unsigned long cnt = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); @@ -425,59 +660,31 @@ static void command_read(char * token, Channel * c) { if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - h = find_open_file_info(id); if (h == NULL) { - err = EBADF; - write_stringz(&c->out, "null"); + reply_read(token, &c->out, EBADF, NULL, 0, 0); } else { - char buf[BUF_SIZE]; - JsonWriteBinaryState state; - json_write_binary_start(&state, &c->out); - while (cnt < len) { - if (lseek(h->file, offset + cnt, SEEK_SET) == -1) { - assert(errno != 0); - err = errno; - break; - } - else { - int rd = read(h->file, buf, BUF_SIZE < len - cnt ? BUF_SIZE : len - cnt); - if (rd < 0) { - assert(errno != 0); - err = errno; - break; - } - if (rd == 0) { - assert(cnt < len); - eof = 1; - break; - } - json_write_binary_data(&state, buf, rd); - cnt += rd; - } - } - json_write_binary_end(&state); - write_stream(&c->out, 0); + IORequest * req = create_io_request(token, h, REQ_READ); + req->offset = offset; + req->info.type = AsyncReqRead; + req->info.u.fio.fd = h->file; + req->info.u.fio.bufp = loc_alloc(len); + req->info.u.fio.bufsz = len; + post_io_requst(h); } - - assert(err || eof || cnt == len); - write_fs_errno(&c->out, err); - json_write_boolean(&c->out, eof); - write_stream(&c->out, 0); - write_stream(&c->out, MARKER_EOM); } static void command_write(char * token, Channel * c) { char id[256]; OpenFileInfo * h = NULL; - int err = 0; int64 offset; unsigned long len = 0; JsonReadBinaryState state; + static size_t buf_size = 0; + static char * buf = NULL; + json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); offset = json_read_int64(&c->inp); @@ -486,43 +693,33 @@ static void command_write(char * token, Channel * c) { json_read_binary_start(&state, &c->inp); h = find_open_file_info(id); - if (h == NULL) err = EBADF; for (;;) { - char buf[BUF_SIZE]; - int rd = json_read_binary_data(&state, buf, sizeof(buf)); - if (rd == 0) break; - if (err == 0 && lseek(h->file, offset + len, SEEK_SET) == -1) { - err = errno; - } - if (err == 0) { - int wr = write(h->file, buf, rd); - if (wr < 0) err = errno; - else if (wr < rd) err = ENOSPC; + int rd; + if (buf_size < len + BUF_SIZE) { + buf_size += BUF_SIZE; + buf = loc_realloc(buf, buf_size); } + rd = json_read_binary_data(&state, buf + len, buf_size - len); + if (rd == 0) break; len += rd; } json_read_binary_end(&state); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_fs_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); -} - -static void write_stat_result(char * token, Channel * c, int err, struct_stat * buf) { - FileAttrs attrs; - - if (err == 0) fill_attrs(&attrs, buf); - else memset(&attrs, 0, sizeof(attrs)); - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_fs_errno(&c->out, err); - write_file_attrs(&c->out, &attrs); - write_stream(&c->out, 0); - write_stream(&c->out, MARKER_EOM); + if (h == NULL) { + reply_write(token, &c->out, EBADF); + } + else { + IORequest * req = create_io_request(token, h, REQ_WRITE); + req->offset = offset; + req->info.type = AsyncReqWrite; + req->info.u.fio.fd = h->file; + req->info.u.fio.bufp = loc_alloc(len); + req->info.u.fio.bufsz = len; + memcpy(req->info.u.fio.bufp, buf, len); + post_io_requst(h); + } } static void command_stat(char * token, Channel * c) { @@ -537,7 +734,7 @@ static void command_stat(char * token, Channel * c) { memset(&buf, 0, sizeof(buf)); if (stat(path, &buf) < 0) err = errno; - write_stat_result(token, c, err, &buf); + reply_stat(token, &c->out, err, &buf); } static void command_lstat(char * token, Channel * c) { @@ -552,24 +749,25 @@ static void command_lstat(char * token, Channel * c) { memset(&buf, 0, sizeof(buf)); if (lstat(path, &buf) < 0) err = errno; - write_stat_result(token, c, err, &buf); + reply_stat(token, &c->out, err, &buf); } static void command_fstat(char * token, Channel * c) { char id[256]; - struct_stat buf; OpenFileInfo * h = NULL; - int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); h = find_open_file_info(id); - memset(&buf, 0, sizeof(buf)); - if (h == NULL) err = EBADF; - else if (fstat(h->file, &buf) < 0) err = errno; - write_stat_result(token, c, err, &buf); + if (h == NULL) { + reply_stat(token, &c->out, EBADF, NULL); + } + else { + create_io_request(token, h, REQ_FSTAT); + post_io_requst(h); + } } static void command_setstat(char * token, Channel * c) { @@ -602,17 +800,13 @@ static void command_setstat(char * token, Channel * c) { if (utime(path, &buf) < 0) err = errno; } - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_fs_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); + reply_setstat(token, &c->out, err); } static void command_fsetstat(char * token, Channel * c) { char id[256]; FileAttrs attrs; OpenFileInfo * h = NULL; - int err = 0; json_read_string(&c->inp, id, sizeof(id)); if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); @@ -623,36 +817,13 @@ static void command_fsetstat(char * token, Channel * c) { h = find_open_file_info(id); if (h == NULL) { - err = EBADF; + reply_setstat(token, &c->out, EBADF); } else { - if (attrs.flags & ATTR_SIZE) { - if (ftruncate(h->file, attrs.size) < 0) err = errno; - } -#if defined(WIN32) || defined(_WRS_KERNEL) - if (attrs.flags & ATTR_PERMISSIONS) { - if (chmod(h->path, attrs.permissions) < 0) err = errno; - } -#else - if (attrs.flags & ATTR_UIDGID) { - if (fchown(h->file, attrs.uid, attrs.gid) < 0) err = errno; - } - if (attrs.flags & ATTR_PERMISSIONS) { - if (fchmod(h->file, attrs.permissions) < 0) err = errno; - } -#endif - if (attrs.flags & ATTR_ACMODTIME) { - struct utimbuf buf; - buf.actime = (long)(attrs.atime / 1000); - buf.modtime = (long)(attrs.mtime / 1000); - if (utime(h->path, &buf) < 0) err = errno; - } + IORequest * req = create_io_request(token, h, REQ_FSETSTAT); + req->attrs = attrs; + post_io_requst(h); } - - write_stringz(&c->out, "R"); - write_stringz(&c->out, token); - write_fs_errno(&c->out, err); - write_stream(&c->out, MARKER_EOM); } static void command_opendir(char * token, Channel * c) { @@ -670,7 +841,7 @@ static void command_opendir(char * token, Channel * c) { err = errno; } else { - handle = create_open_file_info(&c->inp, path, -1, dir); + handle = create_open_file_info(c, path, -1, dir); } write_stringz(&c->out, "R"); |