Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreutarass2009-05-07 21:36:12 +0000
committereutarass2009-05-07 21:36:12 +0000
commit39b068b22fb18bd09a74c812f0257a86279f5163 (patch)
treed231e2ae4ffb38b6fbc714c18458595237e9abaa /filesystem.c
parent351860b9e70daa2066c2e73a59c7c15d117e7716 (diff)
downloadorg.eclipse.tcf.agent-39b068b22fb18bd09a74c812f0257a86279f5163.tar.gz
org.eclipse.tcf.agent-39b068b22fb18bd09a74c812f0257a86279f5163.tar.xz
org.eclipse.tcf.agent-39b068b22fb18bd09a74c812f0257a86279f5163.zip
FileSystem service now uses asynchronous I/O: read/write commands for different files are executed in parallel.
Diffstat (limited to 'filesystem.c')
-rw-r--r--filesystem.c445
1 files changed, 308 insertions, 137 deletions
diff --git a/filesystem.c b/filesystem.c
index 5dd3ee85..8f894472 100644
--- a/filesystem.c
+++ b/filesystem.c
@@ -32,6 +32,7 @@
# include <dirent.h>
#endif
#include "myalloc.h"
+#include "asyncreq.h"
#include "streams.h"
#include "channel.h"
#include "link.h"
@@ -64,7 +65,25 @@ static const int
FSERR_NO_SUCH_FILE = 0x10002,
FSERR_PERMISSION_DENIED = 0x10003;
+#define REQ_READ 1
+#define REQ_WRITE 2
+#define REQ_FSTAT 3
+#define REQ_FSETSTAT 4
+#define REQ_CLOSE 5
+
typedef struct OpenFileInfo OpenFileInfo;
+typedef struct IORequest IORequest;
+typedef struct FileAttrs FileAttrs;
+
+struct FileAttrs {
+ int flags;
+ int64 size;
+ int uid;
+ int gid;
+ int permissions;
+ int64 atime;
+ int64 mtime;
+};
struct OpenFileInfo {
unsigned long handle;
@@ -72,24 +91,26 @@ struct OpenFileInfo {
int file;
DIR * dir;
InputStream * inp;
+ OutputStream * out;
LINK link_ring;
LINK link_hash;
+ LINK link_reqs;
+ IORequest * posted_req;
+};
+
+struct IORequest {
+ int req;
+ char token[256];
+ OpenFileInfo * handle;
+ int64 offset;
+ FileAttrs attrs;
+ AsyncReqInfo info;
+ LINK link_reqs;
};
#define hash2file(A) ((OpenFileInfo *)((char *)(A) - (int)&((OpenFileInfo *)0)->link_hash))
#define ring2file(A) ((OpenFileInfo *)((char *)(A) - (int)&((OpenFileInfo *)0)->link_ring))
-
-typedef struct FileAttrs FileAttrs;
-
-struct FileAttrs {
- int flags;
- int64 size;
- int uid;
- int gid;
- int permissions;
- int64 atime;
- int64 mtime;
-};
+#define reqs2req(A) ((IORequest *)((char *)(A) - (int)&((IORequest *)0)->link_reqs))
static unsigned long handle_cnt = 0;
@@ -101,7 +122,7 @@ static LINK file_info_ring = { NULL, NULL };
# define FS_ROOT "host:c:/"
#endif
-static OpenFileInfo * create_open_file_info(InputStream * inp, char * path, int file, DIR * dir) {
+static OpenFileInfo * create_open_file_info(Channel * ch, char * path, int file, DIR * dir) {
int i = 0;
LINK * list_head = NULL;
@@ -122,9 +143,11 @@ static OpenFileInfo * create_open_file_info(InputStream * inp, char * path, int
strcpy(h->path, path);
h->file = file;
h->dir = dir;
- h->inp = inp;
+ h->inp = &ch->inp;
+ h->out = &ch->out;
list_add_first(&h->link_ring, &file_info_ring);
list_add_first(&h->link_hash, list_head);
+ list_init(&h->link_reqs);
return h;
}
@@ -144,6 +167,7 @@ static OpenFileInfo * find_open_file_info(char * id) {
}
static void delete_open_file_info(OpenFileInfo * h) {
+ assert(list_is_empty(&h->link_reqs));
list_remove(&h->link_ring);
list_remove(&h->link_hash);
loc_free(h);
@@ -166,6 +190,18 @@ static void channel_close_listener(Channel * c) {
if (h->file >= 0) {
close(h->file);
h->file = -1;
+ while (!list_is_empty(&h->link_reqs)) {
+ LINK * link = h->link_reqs.next;
+ IORequest * req = reqs2req(link);
+ list_remove(link);
+ if (h->posted_req == req) {
+ req->handle = NULL;
+ }
+ else {
+ loc_free(req->info.u.fio.bufp);
+ loc_free(req);
+ }
+ }
}
list_add_last(&h->link_hash, &list);
}
@@ -362,7 +398,7 @@ static void command_open(char * token, Channel * c) {
err = errno;
}
else {
- handle = create_open_file_info(&c->inp, path, file, NULL);
+ handle = create_open_file_info(c, path, file, NULL);
}
write_stringz(&c->out, "R");
@@ -372,6 +408,212 @@ static void command_open(char * token, Channel * c) {
write_stream(&c->out, MARKER_EOM);
}
+static void reply_close(char * token, OutputStream * out, int err) {
+ write_stringz(out, "R");
+ write_stringz(out, token);
+ write_fs_errno(out, err);
+ write_stream(out, MARKER_EOM);
+}
+
+static void reply_read(char * token, OutputStream * out, int err, void * buf, unsigned len, int eof) {
+ write_stringz(out, "R");
+ write_stringz(out, token);
+ if (buf != NULL) {
+ JsonWriteBinaryState state;
+
+ json_write_binary_start(&state, out);
+ json_write_binary_data(&state, buf, len);
+ json_write_binary_end(&state);
+ write_stream(out, 0);
+ }
+ else {
+ write_stringz(out, "null");
+ }
+ write_fs_errno(out, err);
+ json_write_boolean(out, eof);
+ write_stream(out, 0);
+ write_stream(out, MARKER_EOM);
+}
+
+static void reply_write(char * token, OutputStream * out, int err) {
+ write_stringz(out, "R");
+ write_stringz(out, token);
+ write_fs_errno(out, err);
+ write_stream(out, MARKER_EOM);
+}
+
+static void reply_stat(char * token, OutputStream * out, int err, struct_stat * buf) {
+ FileAttrs attrs;
+
+ if (err == 0) fill_attrs(&attrs, buf);
+ else memset(&attrs, 0, sizeof(attrs));
+
+ write_stringz(out, "R");
+ write_stringz(out, token);
+ write_fs_errno(out, err);
+ write_file_attrs(out, &attrs);
+ write_stream(out, 0);
+ write_stream(out, MARKER_EOM);
+}
+
+static void reply_setstat(char * token, OutputStream * out, int err) {
+ write_stringz(out, "R");
+ write_stringz(out, token);
+ write_fs_errno(out, err);
+ write_stream(out, MARKER_EOM);
+}
+
+static void post_io_requst(OpenFileInfo * handle);
+
+static void done_io_request(void * arg) {
+ int err = 0;
+ IORequest * req = (IORequest *)((AsyncReqInfo *)arg)->client_data;
+ OpenFileInfo * handle = req->handle;
+
+ if (handle == NULL) {
+ loc_free(req->info.u.fio.bufp);
+ loc_free(req);
+ return;
+ }
+
+ assert(handle->posted_req == req);
+ assert(&handle->posted_req->link_reqs == handle->link_reqs.next);
+ handle->posted_req = NULL;
+ list_remove(&req->link_reqs);
+
+ switch (req->req) {
+ case REQ_READ:
+ if (req->info.error) {
+ reply_read(req->token, handle->out, req->info.error, NULL, 0, 0);
+ }
+ else {
+ reply_read(req->token, handle->out, 0,
+ req->info.u.fio.bufp, req->info.u.fio.rval,
+ (size_t)req->info.u.fio.rval < req->info.u.fio.bufsz);
+ }
+ break;
+ case REQ_WRITE:
+ if (req->info.error) err = req->info.error;
+ else if ((size_t)req->info.u.fio.rval < req->info.u.fio.bufsz) err = ENOSPC;
+ reply_write(req->token, handle->out, err);
+ break;
+ case REQ_CLOSE:
+ err = req->info.error;
+ reply_close(req->token, handle->out, err);
+ if (err == 0) {
+ loc_free(req);
+ while (!list_is_empty(&handle->link_reqs)) {
+ LINK * link = handle->link_reqs.next;
+ req = reqs2req(link);
+ switch (req->req) {
+ case REQ_READ:
+ reply_read(req->token, handle->out, EBADF, NULL, 0, 0);
+ break;
+ case REQ_WRITE:
+ reply_write(req->token, handle->out, EBADF);
+ break;
+ case REQ_FSTAT:
+ reply_stat(req->token, handle->out, EBADF, NULL);
+ break;
+ case REQ_FSETSTAT:
+ reply_setstat(req->token, handle->out, EBADF);
+ break;
+ case REQ_CLOSE:
+ reply_close(req->token, handle->out, EBADF);
+ break;
+ default:
+ assert(0);
+ }
+ list_remove(link);
+ loc_free(req->info.u.fio.bufp);
+ loc_free(req);
+ }
+ flush_stream(handle->out);
+ delete_open_file_info(handle);
+ return;
+ }
+ break;
+ default:
+ assert(0);
+ }
+
+ loc_free(req->info.u.fio.bufp);
+ loc_free(req);
+ post_io_requst(handle);
+ flush_stream(handle->out);
+}
+
+static void post_io_requst(OpenFileInfo * handle) {
+ while (handle->posted_req == NULL && !list_is_empty(&handle->link_reqs)) {
+ LINK * link = handle->link_reqs.next;
+ IORequest * req = reqs2req(link);
+ switch (req->req) {
+ case REQ_READ:
+ case REQ_WRITE:
+ if (lseek(handle->file, req->offset, SEEK_SET) == -1) {
+ assert(errno != 0);
+ req->info.error = errno;
+ handle->posted_req = req;
+ post_event(done_io_request, &req->info);
+ return;
+ }
+ break;
+ case REQ_FSTAT:
+ {
+ int err = 0;
+ struct_stat buf;
+ memset(&buf, 0, sizeof(buf));
+ if (fstat(handle->file, &buf) < 0) err = errno;
+ reply_stat(req->token, handle->out, err, &buf);
+ list_remove(link);
+ }
+ continue;
+ case REQ_FSETSTAT:
+ {
+ int err = 0;
+ FileAttrs attrs = req->attrs;
+ if (attrs.flags & ATTR_SIZE) {
+ if (ftruncate(handle->file, attrs.size) < 0) err = errno;
+ }
+#if defined(WIN32) || defined(_WRS_KERNEL)
+ if (attrs.flags & ATTR_PERMISSIONS) {
+ if (chmod(handle->path, attrs.permissions) < 0) err = errno;
+ }
+#else
+ if (attrs.flags & ATTR_UIDGID) {
+ if (fchown(handle->file, attrs.uid, attrs.gid) < 0) err = errno;
+ }
+ if (attrs.flags & ATTR_PERMISSIONS) {
+ if (fchmod(handle->file, attrs.permissions) < 0) err = errno;
+ }
+#endif
+ if (attrs.flags & ATTR_ACMODTIME) {
+ struct utimbuf buf;
+ buf.actime = (long)(attrs.atime / 1000);
+ buf.modtime = (long)(attrs.mtime / 1000);
+ if (utime(handle->path, &buf) < 0) err = errno;
+ }
+ reply_setstat(req->token, handle->out, err);
+ list_remove(link);
+ }
+ continue;
+ }
+ handle->posted_req = req;
+ async_req_post(&req->info);
+ }
+}
+
+static IORequest * create_io_request(char * token, OpenFileInfo * handle, int type) {
+ IORequest * req = loc_alloc_zero(sizeof(IORequest));
+ req->req = type;
+ req->handle = handle;
+ req->info.done = done_io_request;
+ req->info.client_data = req;
+ strncpy(req->token, token, sizeof(req->token) - 1);
+ list_add_last(&req->link_reqs, &handle->link_reqs);
+ return req;
+}
+
static void command_close(char * token, Channel * c) {
char id[256];
OpenFileInfo * h = NULL;
@@ -394,28 +636,21 @@ static void command_close(char * token, Channel * c) {
}
}
else {
- if (close(h->file) < 0) {
- err = errno;
- }
- else {
- delete_open_file_info(h);
- }
+ IORequest * req = create_io_request(token, h, REQ_CLOSE);
+ req->info.type = AsyncReqClose;
+ req->info.u.fio.fd = h->file;
+ post_io_requst(h);
+ return;
}
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
- write_fs_errno(&c->out, err);
- write_stream(&c->out, MARKER_EOM);
+ reply_close(token, &c->out, err);
}
static void command_read(char * token, Channel * c) {
char id[256];
OpenFileInfo * h = NULL;
- int err = 0;
- int eof = 0;
int64 offset;
unsigned long len;
- unsigned long cnt = 0;
json_read_string(&c->inp, id, sizeof(id));
if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
@@ -425,59 +660,31 @@ static void command_read(char * token, Channel * c) {
if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
-
h = find_open_file_info(id);
if (h == NULL) {
- err = EBADF;
- write_stringz(&c->out, "null");
+ reply_read(token, &c->out, EBADF, NULL, 0, 0);
}
else {
- char buf[BUF_SIZE];
- JsonWriteBinaryState state;
- json_write_binary_start(&state, &c->out);
- while (cnt < len) {
- if (lseek(h->file, offset + cnt, SEEK_SET) == -1) {
- assert(errno != 0);
- err = errno;
- break;
- }
- else {
- int rd = read(h->file, buf, BUF_SIZE < len - cnt ? BUF_SIZE : len - cnt);
- if (rd < 0) {
- assert(errno != 0);
- err = errno;
- break;
- }
- if (rd == 0) {
- assert(cnt < len);
- eof = 1;
- break;
- }
- json_write_binary_data(&state, buf, rd);
- cnt += rd;
- }
- }
- json_write_binary_end(&state);
- write_stream(&c->out, 0);
+ IORequest * req = create_io_request(token, h, REQ_READ);
+ req->offset = offset;
+ req->info.type = AsyncReqRead;
+ req->info.u.fio.fd = h->file;
+ req->info.u.fio.bufp = loc_alloc(len);
+ req->info.u.fio.bufsz = len;
+ post_io_requst(h);
}
-
- assert(err || eof || cnt == len);
- write_fs_errno(&c->out, err);
- json_write_boolean(&c->out, eof);
- write_stream(&c->out, 0);
- write_stream(&c->out, MARKER_EOM);
}
static void command_write(char * token, Channel * c) {
char id[256];
OpenFileInfo * h = NULL;
- int err = 0;
int64 offset;
unsigned long len = 0;
JsonReadBinaryState state;
+ static size_t buf_size = 0;
+ static char * buf = NULL;
+
json_read_string(&c->inp, id, sizeof(id));
if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
offset = json_read_int64(&c->inp);
@@ -486,43 +693,33 @@ static void command_write(char * token, Channel * c) {
json_read_binary_start(&state, &c->inp);
h = find_open_file_info(id);
- if (h == NULL) err = EBADF;
for (;;) {
- char buf[BUF_SIZE];
- int rd = json_read_binary_data(&state, buf, sizeof(buf));
- if (rd == 0) break;
- if (err == 0 && lseek(h->file, offset + len, SEEK_SET) == -1) {
- err = errno;
- }
- if (err == 0) {
- int wr = write(h->file, buf, rd);
- if (wr < 0) err = errno;
- else if (wr < rd) err = ENOSPC;
+ int rd;
+ if (buf_size < len + BUF_SIZE) {
+ buf_size += BUF_SIZE;
+ buf = loc_realloc(buf, buf_size);
}
+ rd = json_read_binary_data(&state, buf + len, buf_size - len);
+ if (rd == 0) break;
len += rd;
}
json_read_binary_end(&state);
if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
- write_fs_errno(&c->out, err);
- write_stream(&c->out, MARKER_EOM);
-}
-
-static void write_stat_result(char * token, Channel * c, int err, struct_stat * buf) {
- FileAttrs attrs;
-
- if (err == 0) fill_attrs(&attrs, buf);
- else memset(&attrs, 0, sizeof(attrs));
-
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
- write_fs_errno(&c->out, err);
- write_file_attrs(&c->out, &attrs);
- write_stream(&c->out, 0);
- write_stream(&c->out, MARKER_EOM);
+ if (h == NULL) {
+ reply_write(token, &c->out, EBADF);
+ }
+ else {
+ IORequest * req = create_io_request(token, h, REQ_WRITE);
+ req->offset = offset;
+ req->info.type = AsyncReqWrite;
+ req->info.u.fio.fd = h->file;
+ req->info.u.fio.bufp = loc_alloc(len);
+ req->info.u.fio.bufsz = len;
+ memcpy(req->info.u.fio.bufp, buf, len);
+ post_io_requst(h);
+ }
}
static void command_stat(char * token, Channel * c) {
@@ -537,7 +734,7 @@ static void command_stat(char * token, Channel * c) {
memset(&buf, 0, sizeof(buf));
if (stat(path, &buf) < 0) err = errno;
- write_stat_result(token, c, err, &buf);
+ reply_stat(token, &c->out, err, &buf);
}
static void command_lstat(char * token, Channel * c) {
@@ -552,24 +749,25 @@ static void command_lstat(char * token, Channel * c) {
memset(&buf, 0, sizeof(buf));
if (lstat(path, &buf) < 0) err = errno;
- write_stat_result(token, c, err, &buf);
+ reply_stat(token, &c->out, err, &buf);
}
static void command_fstat(char * token, Channel * c) {
char id[256];
- struct_stat buf;
OpenFileInfo * h = NULL;
- int err = 0;
json_read_string(&c->inp, id, sizeof(id));
if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX);
h = find_open_file_info(id);
- memset(&buf, 0, sizeof(buf));
- if (h == NULL) err = EBADF;
- else if (fstat(h->file, &buf) < 0) err = errno;
- write_stat_result(token, c, err, &buf);
+ if (h == NULL) {
+ reply_stat(token, &c->out, EBADF, NULL);
+ }
+ else {
+ create_io_request(token, h, REQ_FSTAT);
+ post_io_requst(h);
+ }
}
static void command_setstat(char * token, Channel * c) {
@@ -602,17 +800,13 @@ static void command_setstat(char * token, Channel * c) {
if (utime(path, &buf) < 0) err = errno;
}
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
- write_fs_errno(&c->out, err);
- write_stream(&c->out, MARKER_EOM);
+ reply_setstat(token, &c->out, err);
}
static void command_fsetstat(char * token, Channel * c) {
char id[256];
FileAttrs attrs;
OpenFileInfo * h = NULL;
- int err = 0;
json_read_string(&c->inp, id, sizeof(id));
if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX);
@@ -623,36 +817,13 @@ static void command_fsetstat(char * token, Channel * c) {
h = find_open_file_info(id);
if (h == NULL) {
- err = EBADF;
+ reply_setstat(token, &c->out, EBADF);
}
else {
- if (attrs.flags & ATTR_SIZE) {
- if (ftruncate(h->file, attrs.size) < 0) err = errno;
- }
-#if defined(WIN32) || defined(_WRS_KERNEL)
- if (attrs.flags & ATTR_PERMISSIONS) {
- if (chmod(h->path, attrs.permissions) < 0) err = errno;
- }
-#else
- if (attrs.flags & ATTR_UIDGID) {
- if (fchown(h->file, attrs.uid, attrs.gid) < 0) err = errno;
- }
- if (attrs.flags & ATTR_PERMISSIONS) {
- if (fchmod(h->file, attrs.permissions) < 0) err = errno;
- }
-#endif
- if (attrs.flags & ATTR_ACMODTIME) {
- struct utimbuf buf;
- buf.actime = (long)(attrs.atime / 1000);
- buf.modtime = (long)(attrs.mtime / 1000);
- if (utime(h->path, &buf) < 0) err = errno;
- }
+ IORequest * req = create_io_request(token, h, REQ_FSETSTAT);
+ req->attrs = attrs;
+ post_io_requst(h);
}
-
- write_stringz(&c->out, "R");
- write_stringz(&c->out, token);
- write_fs_errno(&c->out, err);
- write_stream(&c->out, MARKER_EOM);
}
static void command_opendir(char * token, Channel * c) {
@@ -670,7 +841,7 @@ static void command_opendir(char * token, Channel * c) {
err = errno;
}
else {
- handle = create_open_file_info(&c->inp, path, -1, dir);
+ handle = create_open_file_info(c, path, -1, dir);
}
write_stringz(&c->out, "R");

Back to the top