From 6d544dd1bacd6ce6b0dee18bfb801256c5171ddc Mon Sep 17 00:00:00 2001 From: Felix Burton Date: Wed, 17 Oct 2012 11:19:05 -0700 Subject: TCF Agent: add shutdown manager to enable controlled shutdown of subsystems --- agent/agent.vcproj | 8 +++ agent/agent.vcxproj | 2 + agent/agent.vcxproj.filters | 6 ++ agent/tcf/framework/asyncreq.c | 71 ++++++++++++++++++++--- agent/tcf/framework/channel.c | 23 ++++++++ agent/tcf/framework/channel.h | 2 + agent/tcf/framework/channel_pipe.c | 10 ++++ agent/tcf/framework/channel_tcp.c | 6 ++ agent/tcf/framework/shutdown.c | 115 +++++++++++++++++++++++++++++++++++++ agent/tcf/framework/shutdown.h | 96 +++++++++++++++++++++++++++++++ agent/tcf/framework/trace.c | 3 +- agent/tcf/framework/trace.h | 1 + server/server.vcproj | 8 +++ server/server.vcxproj | 2 + server/server.vcxproj.filters | 6 ++ 15 files changed, 350 insertions(+), 9 deletions(-) create mode 100644 agent/tcf/framework/shutdown.c create mode 100644 agent/tcf/framework/shutdown.h diff --git a/agent/agent.vcproj b/agent/agent.vcproj index 47345edd..cd3f39fa 100644 --- a/agent/agent.vcproj +++ b/agent/agent.vcproj @@ -579,6 +579,14 @@ RelativePath=".\tcf\framework\proxy.h" > + + + + diff --git a/agent/agent.vcxproj b/agent/agent.vcxproj index 2e8e1326..05c46bb1 100644 --- a/agent/agent.vcxproj +++ b/agent/agent.vcxproj @@ -264,6 +264,7 @@ + @@ -407,6 +408,7 @@ + diff --git a/agent/agent.vcxproj.filters b/agent/agent.vcxproj.filters index a5a7e7fb..f2a8446b 100644 --- a/agent/agent.vcxproj.filters +++ b/agent/agent.vcxproj.filters @@ -114,6 +114,9 @@ framework + + framework + framework @@ -374,6 +377,9 @@ framework + + framework + framework diff --git a/agent/tcf/framework/asyncreq.c b/agent/tcf/framework/asyncreq.c index 44838ca9..f9904f9d 100644 --- a/agent/tcf/framework/asyncreq.c +++ b/agent/tcf/framework/asyncreq.c @@ -34,11 +34,13 @@ #include #include #include +#include #define MAX_WORKER_THREADS 32 static LINK wtlist = TCF_LIST_INIT(wtlist); static int wtlist_size = 0; +static int wtrunning_count = 0; static pthread_mutex_t wtlock; typedef struct WorkerThread { @@ -50,6 +52,37 @@ typedef struct WorkerThread { #define wtlink2wt(A) ((WorkerThread *)((char *)(A) - offsetof(WorkerThread, wtlink))) +static AsyncReqInfo shutdown_req; + +static void trigger_async_shutdown(ShutdownInfo * obj) { + check_error(pthread_mutex_lock(&wtlock)); + while (!list_is_empty(&wtlist)) { + WorkerThread * wt = wtlink2wt(wtlist.next); + list_remove(&wt->wtlink); + wtlist_size--; + assert(wt->req == NULL); + wt->req = &shutdown_req; + check_error(pthread_cond_signal(&wt->cond)); + } + check_error(pthread_mutex_unlock(&wtlock)); +} + +static ShutdownInfo async_shutdown = { trigger_async_shutdown }; + + +static void worker_thread_exit(void * x) { + WorkerThread * wt = (WorkerThread *)x; + + check_error(pthread_cond_destroy(&wt->cond)); + pthread_join(wt->thread, NULL); + loc_free(wt); + check_error(pthread_mutex_lock(&wtlock)); + if (--wtrunning_count == 0) + shutdown_set_stopped(&async_shutdown); + trace(LOG_ASYNCREQ, "worker_thread_exit %p running threads %d", wt, wtrunning_count); + check_error(pthread_mutex_unlock(&wtlock)); +} + static void * worker_thread_handler(void * x) { WorkerThread * wt = (WorkerThread *)x; @@ -191,9 +224,8 @@ static void * worker_thread_handler(void * x) { * not created unnecessarily */ post_event(req->done, req); wt->req = NULL; - if (wtlist_size >= MAX_WORKER_THREADS) { - check_error(pthread_cond_destroy(&wt->cond)); - loc_free(wt); + if (wtlist_size >= MAX_WORKER_THREADS || + async_shutdown.state == SHUTDOWN_STATE_PENDING) { check_error(pthread_mutex_unlock(&wtlock)); break; } @@ -204,11 +236,33 @@ static void * worker_thread_handler(void * x) { if (wt->req != NULL) break; } check_error(pthread_mutex_unlock(&wtlock)); + if (wt->req == &shutdown_req) break; } - check_error(pthread_detach(pthread_self())); + post_event(worker_thread_exit, wt); return NULL; } +static void worker_thread_add(AsyncReqInfo * req) { + WorkerThread * wt; + + assert(is_dispatch_thread()); + wt = (WorkerThread *)loc_alloc_zero(sizeof *wt); + wt->req = req; + check_error(pthread_cond_init(&wt->cond, NULL)); + check_error(pthread_create(&wt->thread, &pthread_create_attr, worker_thread_handler, wt)); + if (wtrunning_count++ == 0) + shutdown_set_normal(&async_shutdown); + trace(LOG_ASYNCREQ, "worker_thread_add %p running threads %d", wt, wtrunning_count); +} + +static void worker_thread_add_deferred(void * x) { + AsyncReqInfo * req = (AsyncReqInfo *)x; + + check_error(pthread_mutex_lock(&wtlock)); + worker_thread_add(req); + check_error(pthread_mutex_unlock(&wtlock)); +} + #if ENABLE_AIO static void aio_done(union sigval arg) { AsyncReqInfo * req = (AsyncReqInfo *)arg.sival_ptr; @@ -253,10 +307,11 @@ void async_req_post(AsyncReqInfo * req) { check_error(pthread_mutex_lock(&wtlock)); if (list_is_empty(&wtlist)) { assert(wtlist_size == 0); - wt = (WorkerThread *)loc_alloc_zero(sizeof *wt); - wt->req = req; - check_error(pthread_cond_init(&wt->cond, NULL)); - check_error(pthread_create(&wt->thread, &pthread_create_attr, worker_thread_handler, wt)); + if (is_dispatch_thread()) { + worker_thread_add(req); + } else { + post_event(worker_thread_add_deferred, req); + } } else { wt = wtlink2wt(wtlist.next); diff --git a/agent/tcf/framework/channel.c b/agent/tcf/framework/channel.c index 5b7ad08f..f7b5f9e3 100644 --- a/agent/tcf/framework/channel.c +++ b/agent/tcf/framework/channel.c @@ -43,6 +43,9 @@ #include #include +static void trigger_channel_shutdown(ShutdownInfo * obj); + +ShutdownInfo channel_shutdown = { trigger_channel_shutdown }; LINK channel_root = TCF_LIST_INIT(channel_root); LINK channel_server_root = TCF_LIST_INIT(channel_server_root); @@ -64,6 +67,26 @@ static int close_listeners_cnt = 0; static const int BROADCAST_OK_STATES = (1 << ChannelStateConnected) | (1 << ChannelStateRedirectSent) | (1 << ChannelStateRedirectReceived); #define isBoardcastOkay(c) ((1 << (c)->state) & BROADCAST_OK_STATES) +static void trigger_channel_shutdown(ShutdownInfo * obj) { + LINK * l; + + l = channel_root.next; + while (l != &channel_root) { + Channel * c = chanlink2channelp(l); + l = l->next; + if (!is_channel_closed(c)) { + channel_close(c); + } + } + + l = channel_server_root.next; + while (l != &channel_server_root) { + ChannelServer * s = servlink2channelserverp(l); + l = l->next; + s->close(s); + } +} + static void write_all(OutputStream * out, int byte) { TCFBroadcastGroup * bcg = out2bcast(out); LINK * l = bcg->channels.next; diff --git a/agent/tcf/framework/channel.h b/agent/tcf/framework/channel.h index 11820f59..d6f5a212 100644 --- a/agent/tcf/framework/channel.h +++ b/agent/tcf/framework/channel.h @@ -23,7 +23,9 @@ #include #include #include +#include +extern ShutdownInfo channel_shutdown; extern LINK channel_root; extern LINK channel_server_root; diff --git a/agent/tcf/framework/channel_pipe.c b/agent/tcf/framework/channel_pipe.c index 0209f6dc..e9d8f391 100644 --- a/agent/tcf/framework/channel_pipe.c +++ b/agent/tcf/framework/channel_pipe.c @@ -86,6 +86,7 @@ struct ServerInstance { struct ServerPIPE { ChannelServer serv; + int closed; LINK servlink; ServerInstance arr[SERVER_INSTANCE_CNT]; }; @@ -117,6 +118,8 @@ static void delete_channel(ChannelPIPE * c) { channel_clear_broadcast_group(&c->chan); close_input_pipe(c); list_remove(&c->chan.chanlink); + if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root)) + shutdown_set_stopped(&channel_shutdown); c->magic = 0; loc_free(c->ibuf.buf); loc_free(c->chan.peer_name); @@ -465,6 +468,7 @@ static ChannelPIPE * create_channel(int fd_inp, int fd_out, ServerInstance * ser c->chan.out.write_block = pipe_write_block_stream; c->chan.out.splice_block = pipe_splice_block_stream; list_add_last(&c->chan.chanlink, &channel_root); + shutdown_set_normal(&channel_shutdown); c->chan.state = ChannelStateStartWait; c->chan.incoming = server != NULL; c->chan.start_comm = start_channel; @@ -698,7 +702,12 @@ static void server_close(ChannelServer * serv) { int i; assert(is_dispatch_thread()); + if (s->closed) + return; + s->closed = 1; list_remove(&s->serv.servlink); + if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root)) + shutdown_set_stopped(&channel_shutdown); list_remove(&s->servlink); peer_server_free(s->serv.ps); for (i = 0; i < SERVER_INSTANCE_CNT; i++) { @@ -737,6 +746,7 @@ ChannelServer * channel_pipe_server(PeerServer * ps) { s->serv.ps = ps; s->serv.close = server_close; list_add_last(&s->serv.servlink, &channel_server_root); + shutdown_set_normal(&channel_shutdown); list_add_last(&s->servlink, &server_list); for (i = 0; i < SERVER_INSTANCE_CNT; i++) { ServerInstance * ins = s->arr + i; diff --git a/agent/tcf/framework/channel_tcp.c b/agent/tcf/framework/channel_tcp.c index fc1c13f0..1d3f591e 100644 --- a/agent/tcf/framework/channel_tcp.c +++ b/agent/tcf/framework/channel_tcp.c @@ -238,6 +238,8 @@ static void delete_channel(ChannelTCP * c) { c->socket = -1; } list_remove(&c->chan.chanlink); + if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root)) + shutdown_set_stopped(&channel_shutdown); c->magic = 0; #if ENABLE_OutputQueue output_queue_clear(&c->out_queue); @@ -863,6 +865,7 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do c->chan.out.write_block = tcp_write_block_stream; c->chan.out.splice_block = tcp_splice_block_stream; list_add_last(&c->chan.chanlink, &channel_root); + shutdown_set_normal(&channel_shutdown); c->chan.state = ChannelStateStartWait; c->chan.incoming = server; c->chan.start_comm = start_channel; @@ -1029,6 +1032,8 @@ static void server_close(ChannelServer * serv) { assert(is_dispatch_thread()); if (s->sock < 0) return; list_remove(&s->serv.servlink); + if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root)) + shutdown_set_stopped(&channel_shutdown); list_remove(&s->servlink); peer_server_free(s->serv.ps); closesocket(s->sock); @@ -1061,6 +1066,7 @@ static ChannelServer * channel_server_create(PeerServer * ps, int sock) { post_event_with_delay(refresh_all_peer_servers, NULL, PEER_DATA_REFRESH_PERIOD * 1000000); } list_add_last(&si->serv.servlink, &channel_server_root); + shutdown_set_normal(&channel_shutdown); list_add_last(&si->servlink, &server_list); refresh_peer_server(sock, ps); diff --git a/agent/tcf/framework/shutdown.c b/agent/tcf/framework/shutdown.c new file mode 100644 index 00000000..d54ceaf7 --- /dev/null +++ b/agent/tcf/framework/shutdown.c @@ -0,0 +1,115 @@ +/******************************************************************************* + * Copyright (c) 2012 Xilinx, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * You may elect to redistribute this code under either of these licenses. + * + * Contributors: + * Xilinx - initial API and implementation + *******************************************************************************/ + +/* + * Shutdown manager. + */ + +#include +#include +#include +#include +#include + +#include + +static void (*shutdown_complete_handler)(void *shutdown_complete_arg); +static void * shutdown_complete_arg; +static int notify_pending; + +LINK shutdown_active = TCF_LIST_INIT(shutdown_active); + +static void notify_all(void * arg) { + LINK * l; + + assert(notify_pending); + notify_pending = 0; + + if (list_is_empty(&shutdown_active)) { + /* All subsystems are stopped, cancel event dispatching and + * notify shutdown initiator. */ + trace(LOG_SHUTDOWN, "shutdown complete"); + cancel_event_loop(); + shutdown_complete_handler(shutdown_complete_arg); + return; + } + + l = shutdown_active.next; + while (l != &shutdown_active) { + ShutdownInfo *obj = active2ShutdownInfo(l); + l = l->next; + trace(LOG_SHUTDOWN, "shutdown notify %p %d", obj, obj->state); + if (obj->state == SHUTDOWN_STATE_NORMAL) { + obj->state = SHUTDOWN_STATE_PENDING; + obj->notify(obj); + } else { + assert(obj->state == SHUTDOWN_STATE_PENDING); + } + } +} + +int shutdown_start(void (*handler)(void *arg), void * arg) { + assert(is_dispatch_thread()); + assert(handler != NULL); + assert(!notify_pending); + + trace(LOG_SHUTDOWN, "shutdown start"); + if (shutdown_complete_handler) { + return 1; + } + shutdown_complete_handler = handler; + shutdown_complete_arg = arg; + notify_pending = 1; + post_event(notify_all, NULL); + return 0; +} + +void shutdown_set_stopped(ShutdownInfo *obj) { + assert(is_dispatch_thread()); + assert(obj != NULL); + + trace(LOG_SHUTDOWN, "shutdown_set_stopped %p", obj); + if (obj->state == SHUTDOWN_STATE_STOPPED) { + /* Noop */ + return; + } + + assert(obj->state == SHUTDOWN_STATE_NORMAL || + obj->state == SHUTDOWN_STATE_PENDING); + obj->state = SHUTDOWN_STATE_STOPPED; + list_remove(&obj->link_active); + if (shutdown_complete_handler && !notify_pending) { + notify_pending = 1; + post_event(notify_all, NULL); + } +} + +void shutdown_set_normal(ShutdownInfo *obj) { + assert(is_dispatch_thread()); + assert(obj != NULL); + + trace(LOG_SHUTDOWN, "shutdown_set_normal %p", obj); + if (obj->state == SHUTDOWN_STATE_NORMAL) { + /* Noop */ + return; + } + + if (obj->state == SHUTDOWN_STATE_STOPPED) { + list_add_last(&obj->link_active, &shutdown_active); + } else { + assert(obj->state == SHUTDOWN_STATE_PENDING); + } + obj->state = SHUTDOWN_STATE_NORMAL; +} diff --git a/agent/tcf/framework/shutdown.h b/agent/tcf/framework/shutdown.h new file mode 100644 index 00000000..63ae35bd --- /dev/null +++ b/agent/tcf/framework/shutdown.h @@ -0,0 +1,96 @@ +/******************************************************************************* + * Copyright (c) 2012 Xilinx, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * You may elect to redistribute this code under either of these licenses. + * + * Contributors: + * Xilinx - initial API and implementation + *******************************************************************************/ + +/* + * Shutdown manager allows subsystems to register for notification to + * support controlled shutdown of the TCF framework and its add-ons. + * + * A subsystem registers for notificatios by calling + * shutdown_set_normal() with a subsystem specific ShutdownInfo object + * containing a subsystem callback function, client data and subsystem + * shutdown state. + * + * The callback function should initiate shutdown of the subsystem and + * when completed call shutdown_set_stopped(), or if the subsystem + * cannot be stopped because other subsystems actively using it to + * complete their shutdown sequence the subsystem may choose request + * futher notification callbacks by calling shutdown_set_normal() + * again, however this should be done carefully to avoid infinite + * loops. + */ + +#ifndef D_shutdown +#define D_shutdown + +#include + +#define active2ShutdownInfo(A) ((ShutdownInfo *)((char *)(A) - offsetof(ShutdownInfo, link_active))) + +typedef struct ShutdownInfo ShutdownInfo; + +typedef enum { + /* Shutdown object is stopped and is not on shutdown_active list */ + SHUTDOWN_STATE_STOPPED = 0, + + /* Shutdown object needs to be notified if shutdown is started or + * the state of other shutdown objects is changed */ + SHUTDOWN_STATE_NORMAL, + + /* Shutdown object down has been notified and will updates its + * state when possible */ + SHUTDOWN_STATE_PENDING +} ShutdownState; + +struct ShutdownInfo { + /* Called to notify of initial shutdown request or state changes + * of other shutdown objects */ + void (*notify)(ShutdownInfo *); + + /* Client specific data */ + void * client_data; + + /* State of object */ + ShutdownState state; + + /* List of active object */ + LINK link_active; +}; + +/* List of active shutdown objects, i.e. states NORMAL or PENDING */ +extern LINK shutdown_active; + +/* + * Initiates shutdown process. Returns true if shutdown is already started. + * + * handler - the function that should called when shutdown is complete. + * arg - pointer to shutdown data. + */ +extern int shutdown_start(void (*handler)(void *arg), void * arg); + +/* + * Set state of shutdown of object to stopped. + * + * obj - affected ShutdownInfo object. + */ +extern void shutdown_set_stopped(ShutdownInfo *obj); + +/* + * Set state of shutdown of object to normal. + * + * obj - affected ShutdownInfo object. + */ +extern void shutdown_set_normal(ShutdownInfo *obj); + +#endif /* D_shutdown */ diff --git a/agent/tcf/framework/trace.c b/agent/tcf/framework/trace.c index c3192774..e9095495 100644 --- a/agent/tcf/framework/trace.c +++ b/agent/tcf/framework/trace.c @@ -53,7 +53,8 @@ struct trace_mode trace_mode_table[MAX_TRACE_MODES + 1] = { { LOG_ELF, "elf", "ELF reader" }, { LOG_LUA, "lua", "LUA interpreter" }, { LOG_STACK, "stack", "stack trace service" }, - { LOG_PLUGIN, "plugin", "plugins" } + { LOG_PLUGIN, "plugin", "plugins" }, + { LOG_SHUTDOWN, "shutdown", "shutdown of subsystems" } }; static pthread_mutex_t mutex; diff --git a/agent/tcf/framework/trace.h b/agent/tcf/framework/trace.h index 9cddd35f..9f8e547a 100644 --- a/agent/tcf/framework/trace.h +++ b/agent/tcf/framework/trace.h @@ -40,6 +40,7 @@ #define LOG_LUA 0x1000 #define LOG_STACK 0x2000 #define LOG_PLUGIN 0x4000 +#define LOG_SHUTDOWN 0x8000 #define LOG_NAME_STDERR "-" diff --git a/server/server.vcproj b/server/server.vcproj index a02b098b..a2933df7 100644 --- a/server/server.vcproj +++ b/server/server.vcproj @@ -718,6 +718,14 @@ RelativePath="..\agent\tcf\framework\proxy.h" > + + + + diff --git a/server/server.vcxproj b/server/server.vcxproj index acf2c9d7..05e3e625 100644 --- a/server/server.vcxproj +++ b/server/server.vcxproj @@ -222,6 +222,7 @@ + @@ -274,6 +275,7 @@ + diff --git a/server/server.vcxproj.filters b/server/server.vcxproj.filters index ecf02ce7..7a051b04 100644 --- a/server/server.vcxproj.filters +++ b/server/server.vcxproj.filters @@ -156,6 +156,9 @@ framework + + framework + framework @@ -308,6 +311,9 @@ framework + + framework + framework -- cgit v1.2.3