From 6d544dd1bacd6ce6b0dee18bfb801256c5171ddc Mon Sep 17 00:00:00 2001
From: Felix Burton
Date: Wed, 17 Oct 2012 11:19:05 -0700
Subject: TCF Agent: add shutdown manager to enable controlled shutdown of
subsystems
---
agent/agent.vcproj | 8 +++
agent/agent.vcxproj | 2 +
agent/agent.vcxproj.filters | 6 ++
agent/tcf/framework/asyncreq.c | 71 ++++++++++++++++++++---
agent/tcf/framework/channel.c | 23 ++++++++
agent/tcf/framework/channel.h | 2 +
agent/tcf/framework/channel_pipe.c | 10 ++++
agent/tcf/framework/channel_tcp.c | 6 ++
agent/tcf/framework/shutdown.c | 115 +++++++++++++++++++++++++++++++++++++
agent/tcf/framework/shutdown.h | 96 +++++++++++++++++++++++++++++++
agent/tcf/framework/trace.c | 3 +-
agent/tcf/framework/trace.h | 1 +
server/server.vcproj | 8 +++
server/server.vcxproj | 2 +
server/server.vcxproj.filters | 6 ++
15 files changed, 350 insertions(+), 9 deletions(-)
create mode 100644 agent/tcf/framework/shutdown.c
create mode 100644 agent/tcf/framework/shutdown.h
diff --git a/agent/agent.vcproj b/agent/agent.vcproj
index 47345edd..cd3f39fa 100644
--- a/agent/agent.vcproj
+++ b/agent/agent.vcproj
@@ -579,6 +579,14 @@
RelativePath=".\tcf\framework\proxy.h"
>
+
+
+
+
diff --git a/agent/agent.vcxproj b/agent/agent.vcxproj
index 2e8e1326..05c46bb1 100644
--- a/agent/agent.vcxproj
+++ b/agent/agent.vcxproj
@@ -264,6 +264,7 @@
+
@@ -407,6 +408,7 @@
+
diff --git a/agent/agent.vcxproj.filters b/agent/agent.vcxproj.filters
index a5a7e7fb..f2a8446b 100644
--- a/agent/agent.vcxproj.filters
+++ b/agent/agent.vcxproj.filters
@@ -114,6 +114,9 @@
framework
+
+ framework
+
framework
@@ -374,6 +377,9 @@
framework
+
+ framework
+
framework
diff --git a/agent/tcf/framework/asyncreq.c b/agent/tcf/framework/asyncreq.c
index 44838ca9..f9904f9d 100644
--- a/agent/tcf/framework/asyncreq.c
+++ b/agent/tcf/framework/asyncreq.c
@@ -34,11 +34,13 @@
#include
#include
#include
+#include
#define MAX_WORKER_THREADS 32
static LINK wtlist = TCF_LIST_INIT(wtlist);
static int wtlist_size = 0;
+static int wtrunning_count = 0;
static pthread_mutex_t wtlock;
typedef struct WorkerThread {
@@ -50,6 +52,37 @@ typedef struct WorkerThread {
#define wtlink2wt(A) ((WorkerThread *)((char *)(A) - offsetof(WorkerThread, wtlink)))
+static AsyncReqInfo shutdown_req;
+
+static void trigger_async_shutdown(ShutdownInfo * obj) {
+ check_error(pthread_mutex_lock(&wtlock));
+ while (!list_is_empty(&wtlist)) {
+ WorkerThread * wt = wtlink2wt(wtlist.next);
+ list_remove(&wt->wtlink);
+ wtlist_size--;
+ assert(wt->req == NULL);
+ wt->req = &shutdown_req;
+ check_error(pthread_cond_signal(&wt->cond));
+ }
+ check_error(pthread_mutex_unlock(&wtlock));
+}
+
+static ShutdownInfo async_shutdown = { trigger_async_shutdown };
+
+
+static void worker_thread_exit(void * x) {
+ WorkerThread * wt = (WorkerThread *)x;
+
+ check_error(pthread_cond_destroy(&wt->cond));
+ pthread_join(wt->thread, NULL);
+ loc_free(wt);
+ check_error(pthread_mutex_lock(&wtlock));
+ if (--wtrunning_count == 0)
+ shutdown_set_stopped(&async_shutdown);
+ trace(LOG_ASYNCREQ, "worker_thread_exit %p running threads %d", wt, wtrunning_count);
+ check_error(pthread_mutex_unlock(&wtlock));
+}
+
static void * worker_thread_handler(void * x) {
WorkerThread * wt = (WorkerThread *)x;
@@ -191,9 +224,8 @@ static void * worker_thread_handler(void * x) {
* not created unnecessarily */
post_event(req->done, req);
wt->req = NULL;
- if (wtlist_size >= MAX_WORKER_THREADS) {
- check_error(pthread_cond_destroy(&wt->cond));
- loc_free(wt);
+ if (wtlist_size >= MAX_WORKER_THREADS ||
+ async_shutdown.state == SHUTDOWN_STATE_PENDING) {
check_error(pthread_mutex_unlock(&wtlock));
break;
}
@@ -204,11 +236,33 @@ static void * worker_thread_handler(void * x) {
if (wt->req != NULL) break;
}
check_error(pthread_mutex_unlock(&wtlock));
+ if (wt->req == &shutdown_req) break;
}
- check_error(pthread_detach(pthread_self()));
+ post_event(worker_thread_exit, wt);
return NULL;
}
+static void worker_thread_add(AsyncReqInfo * req) {
+ WorkerThread * wt;
+
+ assert(is_dispatch_thread());
+ wt = (WorkerThread *)loc_alloc_zero(sizeof *wt);
+ wt->req = req;
+ check_error(pthread_cond_init(&wt->cond, NULL));
+ check_error(pthread_create(&wt->thread, &pthread_create_attr, worker_thread_handler, wt));
+ if (wtrunning_count++ == 0)
+ shutdown_set_normal(&async_shutdown);
+ trace(LOG_ASYNCREQ, "worker_thread_add %p running threads %d", wt, wtrunning_count);
+}
+
+static void worker_thread_add_deferred(void * x) {
+ AsyncReqInfo * req = (AsyncReqInfo *)x;
+
+ check_error(pthread_mutex_lock(&wtlock));
+ worker_thread_add(req);
+ check_error(pthread_mutex_unlock(&wtlock));
+}
+
#if ENABLE_AIO
static void aio_done(union sigval arg) {
AsyncReqInfo * req = (AsyncReqInfo *)arg.sival_ptr;
@@ -253,10 +307,11 @@ void async_req_post(AsyncReqInfo * req) {
check_error(pthread_mutex_lock(&wtlock));
if (list_is_empty(&wtlist)) {
assert(wtlist_size == 0);
- wt = (WorkerThread *)loc_alloc_zero(sizeof *wt);
- wt->req = req;
- check_error(pthread_cond_init(&wt->cond, NULL));
- check_error(pthread_create(&wt->thread, &pthread_create_attr, worker_thread_handler, wt));
+ if (is_dispatch_thread()) {
+ worker_thread_add(req);
+ } else {
+ post_event(worker_thread_add_deferred, req);
+ }
}
else {
wt = wtlink2wt(wtlist.next);
diff --git a/agent/tcf/framework/channel.c b/agent/tcf/framework/channel.c
index 5b7ad08f..f7b5f9e3 100644
--- a/agent/tcf/framework/channel.c
+++ b/agent/tcf/framework/channel.c
@@ -43,6 +43,9 @@
#include
#include
+static void trigger_channel_shutdown(ShutdownInfo * obj);
+
+ShutdownInfo channel_shutdown = { trigger_channel_shutdown };
LINK channel_root = TCF_LIST_INIT(channel_root);
LINK channel_server_root = TCF_LIST_INIT(channel_server_root);
@@ -64,6 +67,26 @@ static int close_listeners_cnt = 0;
static const int BROADCAST_OK_STATES = (1 << ChannelStateConnected) | (1 << ChannelStateRedirectSent) | (1 << ChannelStateRedirectReceived);
#define isBoardcastOkay(c) ((1 << (c)->state) & BROADCAST_OK_STATES)
+static void trigger_channel_shutdown(ShutdownInfo * obj) {
+ LINK * l;
+
+ l = channel_root.next;
+ while (l != &channel_root) {
+ Channel * c = chanlink2channelp(l);
+ l = l->next;
+ if (!is_channel_closed(c)) {
+ channel_close(c);
+ }
+ }
+
+ l = channel_server_root.next;
+ while (l != &channel_server_root) {
+ ChannelServer * s = servlink2channelserverp(l);
+ l = l->next;
+ s->close(s);
+ }
+}
+
static void write_all(OutputStream * out, int byte) {
TCFBroadcastGroup * bcg = out2bcast(out);
LINK * l = bcg->channels.next;
diff --git a/agent/tcf/framework/channel.h b/agent/tcf/framework/channel.h
index 11820f59..d6f5a212 100644
--- a/agent/tcf/framework/channel.h
+++ b/agent/tcf/framework/channel.h
@@ -23,7 +23,9 @@
#include
#include
#include
+#include
+extern ShutdownInfo channel_shutdown;
extern LINK channel_root;
extern LINK channel_server_root;
diff --git a/agent/tcf/framework/channel_pipe.c b/agent/tcf/framework/channel_pipe.c
index 0209f6dc..e9d8f391 100644
--- a/agent/tcf/framework/channel_pipe.c
+++ b/agent/tcf/framework/channel_pipe.c
@@ -86,6 +86,7 @@ struct ServerInstance {
struct ServerPIPE {
ChannelServer serv;
+ int closed;
LINK servlink;
ServerInstance arr[SERVER_INSTANCE_CNT];
};
@@ -117,6 +118,8 @@ static void delete_channel(ChannelPIPE * c) {
channel_clear_broadcast_group(&c->chan);
close_input_pipe(c);
list_remove(&c->chan.chanlink);
+ if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
+ shutdown_set_stopped(&channel_shutdown);
c->magic = 0;
loc_free(c->ibuf.buf);
loc_free(c->chan.peer_name);
@@ -465,6 +468,7 @@ static ChannelPIPE * create_channel(int fd_inp, int fd_out, ServerInstance * ser
c->chan.out.write_block = pipe_write_block_stream;
c->chan.out.splice_block = pipe_splice_block_stream;
list_add_last(&c->chan.chanlink, &channel_root);
+ shutdown_set_normal(&channel_shutdown);
c->chan.state = ChannelStateStartWait;
c->chan.incoming = server != NULL;
c->chan.start_comm = start_channel;
@@ -698,7 +702,12 @@ static void server_close(ChannelServer * serv) {
int i;
assert(is_dispatch_thread());
+ if (s->closed)
+ return;
+ s->closed = 1;
list_remove(&s->serv.servlink);
+ if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
+ shutdown_set_stopped(&channel_shutdown);
list_remove(&s->servlink);
peer_server_free(s->serv.ps);
for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
@@ -737,6 +746,7 @@ ChannelServer * channel_pipe_server(PeerServer * ps) {
s->serv.ps = ps;
s->serv.close = server_close;
list_add_last(&s->serv.servlink, &channel_server_root);
+ shutdown_set_normal(&channel_shutdown);
list_add_last(&s->servlink, &server_list);
for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
ServerInstance * ins = s->arr + i;
diff --git a/agent/tcf/framework/channel_tcp.c b/agent/tcf/framework/channel_tcp.c
index fc1c13f0..1d3f591e 100644
--- a/agent/tcf/framework/channel_tcp.c
+++ b/agent/tcf/framework/channel_tcp.c
@@ -238,6 +238,8 @@ static void delete_channel(ChannelTCP * c) {
c->socket = -1;
}
list_remove(&c->chan.chanlink);
+ if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
+ shutdown_set_stopped(&channel_shutdown);
c->magic = 0;
#if ENABLE_OutputQueue
output_queue_clear(&c->out_queue);
@@ -863,6 +865,7 @@ static ChannelTCP * create_channel(int sock, int en_ssl, int server, int unix_do
c->chan.out.write_block = tcp_write_block_stream;
c->chan.out.splice_block = tcp_splice_block_stream;
list_add_last(&c->chan.chanlink, &channel_root);
+ shutdown_set_normal(&channel_shutdown);
c->chan.state = ChannelStateStartWait;
c->chan.incoming = server;
c->chan.start_comm = start_channel;
@@ -1029,6 +1032,8 @@ static void server_close(ChannelServer * serv) {
assert(is_dispatch_thread());
if (s->sock < 0) return;
list_remove(&s->serv.servlink);
+ if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
+ shutdown_set_stopped(&channel_shutdown);
list_remove(&s->servlink);
peer_server_free(s->serv.ps);
closesocket(s->sock);
@@ -1061,6 +1066,7 @@ static ChannelServer * channel_server_create(PeerServer * ps, int sock) {
post_event_with_delay(refresh_all_peer_servers, NULL, PEER_DATA_REFRESH_PERIOD * 1000000);
}
list_add_last(&si->serv.servlink, &channel_server_root);
+ shutdown_set_normal(&channel_shutdown);
list_add_last(&si->servlink, &server_list);
refresh_peer_server(sock, ps);
diff --git a/agent/tcf/framework/shutdown.c b/agent/tcf/framework/shutdown.c
new file mode 100644
index 00000000..d54ceaf7
--- /dev/null
+++ b/agent/tcf/framework/shutdown.c
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Copyright (c) 2012 Xilinx, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ * You may elect to redistribute this code under either of these licenses.
+ *
+ * Contributors:
+ * Xilinx - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * Shutdown manager.
+ */
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+static void (*shutdown_complete_handler)(void *shutdown_complete_arg);
+static void * shutdown_complete_arg;
+static int notify_pending;
+
+LINK shutdown_active = TCF_LIST_INIT(shutdown_active);
+
+static void notify_all(void * arg) {
+ LINK * l;
+
+ assert(notify_pending);
+ notify_pending = 0;
+
+ if (list_is_empty(&shutdown_active)) {
+ /* All subsystems are stopped, cancel event dispatching and
+ * notify shutdown initiator. */
+ trace(LOG_SHUTDOWN, "shutdown complete");
+ cancel_event_loop();
+ shutdown_complete_handler(shutdown_complete_arg);
+ return;
+ }
+
+ l = shutdown_active.next;
+ while (l != &shutdown_active) {
+ ShutdownInfo *obj = active2ShutdownInfo(l);
+ l = l->next;
+ trace(LOG_SHUTDOWN, "shutdown notify %p %d", obj, obj->state);
+ if (obj->state == SHUTDOWN_STATE_NORMAL) {
+ obj->state = SHUTDOWN_STATE_PENDING;
+ obj->notify(obj);
+ } else {
+ assert(obj->state == SHUTDOWN_STATE_PENDING);
+ }
+ }
+}
+
+int shutdown_start(void (*handler)(void *arg), void * arg) {
+ assert(is_dispatch_thread());
+ assert(handler != NULL);
+ assert(!notify_pending);
+
+ trace(LOG_SHUTDOWN, "shutdown start");
+ if (shutdown_complete_handler) {
+ return 1;
+ }
+ shutdown_complete_handler = handler;
+ shutdown_complete_arg = arg;
+ notify_pending = 1;
+ post_event(notify_all, NULL);
+ return 0;
+}
+
+void shutdown_set_stopped(ShutdownInfo *obj) {
+ assert(is_dispatch_thread());
+ assert(obj != NULL);
+
+ trace(LOG_SHUTDOWN, "shutdown_set_stopped %p", obj);
+ if (obj->state == SHUTDOWN_STATE_STOPPED) {
+ /* Noop */
+ return;
+ }
+
+ assert(obj->state == SHUTDOWN_STATE_NORMAL ||
+ obj->state == SHUTDOWN_STATE_PENDING);
+ obj->state = SHUTDOWN_STATE_STOPPED;
+ list_remove(&obj->link_active);
+ if (shutdown_complete_handler && !notify_pending) {
+ notify_pending = 1;
+ post_event(notify_all, NULL);
+ }
+}
+
+void shutdown_set_normal(ShutdownInfo *obj) {
+ assert(is_dispatch_thread());
+ assert(obj != NULL);
+
+ trace(LOG_SHUTDOWN, "shutdown_set_normal %p", obj);
+ if (obj->state == SHUTDOWN_STATE_NORMAL) {
+ /* Noop */
+ return;
+ }
+
+ if (obj->state == SHUTDOWN_STATE_STOPPED) {
+ list_add_last(&obj->link_active, &shutdown_active);
+ } else {
+ assert(obj->state == SHUTDOWN_STATE_PENDING);
+ }
+ obj->state = SHUTDOWN_STATE_NORMAL;
+}
diff --git a/agent/tcf/framework/shutdown.h b/agent/tcf/framework/shutdown.h
new file mode 100644
index 00000000..63ae35bd
--- /dev/null
+++ b/agent/tcf/framework/shutdown.h
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * Copyright (c) 2012 Xilinx, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ * You may elect to redistribute this code under either of these licenses.
+ *
+ * Contributors:
+ * Xilinx - initial API and implementation
+ *******************************************************************************/
+
+/*
+ * Shutdown manager allows subsystems to register for notification to
+ * support controlled shutdown of the TCF framework and its add-ons.
+ *
+ * A subsystem registers for notificatios by calling
+ * shutdown_set_normal() with a subsystem specific ShutdownInfo object
+ * containing a subsystem callback function, client data and subsystem
+ * shutdown state.
+ *
+ * The callback function should initiate shutdown of the subsystem and
+ * when completed call shutdown_set_stopped(), or if the subsystem
+ * cannot be stopped because other subsystems actively using it to
+ * complete their shutdown sequence the subsystem may choose request
+ * futher notification callbacks by calling shutdown_set_normal()
+ * again, however this should be done carefully to avoid infinite
+ * loops.
+ */
+
+#ifndef D_shutdown
+#define D_shutdown
+
+#include
+
+#define active2ShutdownInfo(A) ((ShutdownInfo *)((char *)(A) - offsetof(ShutdownInfo, link_active)))
+
+typedef struct ShutdownInfo ShutdownInfo;
+
+typedef enum {
+ /* Shutdown object is stopped and is not on shutdown_active list */
+ SHUTDOWN_STATE_STOPPED = 0,
+
+ /* Shutdown object needs to be notified if shutdown is started or
+ * the state of other shutdown objects is changed */
+ SHUTDOWN_STATE_NORMAL,
+
+ /* Shutdown object down has been notified and will updates its
+ * state when possible */
+ SHUTDOWN_STATE_PENDING
+} ShutdownState;
+
+struct ShutdownInfo {
+ /* Called to notify of initial shutdown request or state changes
+ * of other shutdown objects */
+ void (*notify)(ShutdownInfo *);
+
+ /* Client specific data */
+ void * client_data;
+
+ /* State of object */
+ ShutdownState state;
+
+ /* List of active object */
+ LINK link_active;
+};
+
+/* List of active shutdown objects, i.e. states NORMAL or PENDING */
+extern LINK shutdown_active;
+
+/*
+ * Initiates shutdown process. Returns true if shutdown is already started.
+ *
+ * handler - the function that should called when shutdown is complete.
+ * arg - pointer to shutdown data.
+ */
+extern int shutdown_start(void (*handler)(void *arg), void * arg);
+
+/*
+ * Set state of shutdown of object to stopped.
+ *
+ * obj - affected ShutdownInfo object.
+ */
+extern void shutdown_set_stopped(ShutdownInfo *obj);
+
+/*
+ * Set state of shutdown of object to normal.
+ *
+ * obj - affected ShutdownInfo object.
+ */
+extern void shutdown_set_normal(ShutdownInfo *obj);
+
+#endif /* D_shutdown */
diff --git a/agent/tcf/framework/trace.c b/agent/tcf/framework/trace.c
index c3192774..e9095495 100644
--- a/agent/tcf/framework/trace.c
+++ b/agent/tcf/framework/trace.c
@@ -53,7 +53,8 @@ struct trace_mode trace_mode_table[MAX_TRACE_MODES + 1] = {
{ LOG_ELF, "elf", "ELF reader" },
{ LOG_LUA, "lua", "LUA interpreter" },
{ LOG_STACK, "stack", "stack trace service" },
- { LOG_PLUGIN, "plugin", "plugins" }
+ { LOG_PLUGIN, "plugin", "plugins" },
+ { LOG_SHUTDOWN, "shutdown", "shutdown of subsystems" }
};
static pthread_mutex_t mutex;
diff --git a/agent/tcf/framework/trace.h b/agent/tcf/framework/trace.h
index 9cddd35f..9f8e547a 100644
--- a/agent/tcf/framework/trace.h
+++ b/agent/tcf/framework/trace.h
@@ -40,6 +40,7 @@
#define LOG_LUA 0x1000
#define LOG_STACK 0x2000
#define LOG_PLUGIN 0x4000
+#define LOG_SHUTDOWN 0x8000
#define LOG_NAME_STDERR "-"
diff --git a/server/server.vcproj b/server/server.vcproj
index a02b098b..a2933df7 100644
--- a/server/server.vcproj
+++ b/server/server.vcproj
@@ -718,6 +718,14 @@
RelativePath="..\agent\tcf\framework\proxy.h"
>
+
+
+
+
diff --git a/server/server.vcxproj b/server/server.vcxproj
index acf2c9d7..05e3e625 100644
--- a/server/server.vcxproj
+++ b/server/server.vcxproj
@@ -222,6 +222,7 @@
+
@@ -274,6 +275,7 @@
+
diff --git a/server/server.vcxproj.filters b/server/server.vcxproj.filters
index ecf02ce7..7a051b04 100644
--- a/server/server.vcxproj.filters
+++ b/server/server.vcxproj.filters
@@ -156,6 +156,9 @@
framework
+
+ framework
+
framework
@@ -308,6 +311,9 @@
framework
+
+ framework
+
framework
--
cgit v1.2.3