From: David van Moolenbroek Date: Mon, 24 Aug 2015 20:44:26 +0000 (+0200) Subject: VFS: suspend threads for live update X-Git-Url: http://zhaoyanbai.com/repos/%22http:/static/doc/zpipe.c?a=commitdiff_plain;h=refs%2Fchanges%2F65%2F3165%2F1;p=minix.git VFS: suspend threads for live update - do not allow live update for request and protocol free states if there are any worker threads that have pending or active work; - destroy all worker threads before such live updates and recreate them afterwards, because transferring (the contents of) the thread stacks is not an option at this time; - recreate worker threads in the new instance only if they were shut down before the state transfer, by letting RS provide the original preparation state as initialization information. Change-Id: I846225f5b7281f19e69175485f2c88a4b4891dc2 --- diff --git a/minix/include/minix/ipc.h b/minix/include/minix/ipc.h index a40142334..e9ca59467 100644 --- a/minix/include/minix/ipc.h +++ b/minix/include/minix/ipc.h @@ -1598,7 +1598,8 @@ typedef struct { int flags; vir_bytes buff_addr; size_t buff_len; - uint8_t padding[24]; + int prepare_state; + uint8_t padding[20]; } mess_rs_init; _ASSERT_MSG_SIZE(mess_rs_init); diff --git a/minix/include/minix/sef.h b/minix/include/minix/sef.h index 80c1dc7c9..ab6ab2fcb 100644 --- a/minix/include/minix/sef.h +++ b/minix/include/minix/sef.h @@ -49,6 +49,7 @@ typedef struct { void* init_buff_cleanup_start; size_t init_buff_len; int copy_flags; + int prepare_state; } sef_init_info_t; /* Callback type definitions. */ diff --git a/minix/lib/libsys/sef_init.c b/minix/lib/libsys/sef_init.c index d64b4cdde..e3c50ceee 100644 --- a/minix/lib/libsys/sef_init.c +++ b/minix/lib/libsys/sef_init.c @@ -207,6 +207,7 @@ int do_sef_init_request(message *m_ptr) info.init_buff_start = (void*) m_ptr->m_rs_init.buff_addr; info.init_buff_cleanup_start = info.init_buff_start; info.init_buff_len = m_ptr->m_rs_init.buff_len; + info.prepare_state = m_ptr->m_rs_init.prepare_state; /* Peform initialization. */ r = process_init(type, &info); diff --git a/minix/servers/rs/utility.c b/minix/servers/rs/utility.c index 344a4e4d5..8f109e121 100644 --- a/minix/servers/rs/utility.c +++ b/minix/servers/rs/utility.c @@ -17,7 +17,7 @@ *===========================================================================*/ int init_service(struct rproc *rp, int type, int flags) { - int r; + int r, prepare_state; message m; endpoint_t old_endpoint; @@ -32,8 +32,10 @@ int init_service(struct rproc *rp, int type, int flags) /* Determine the old endpoint if this is a new instance. */ old_endpoint = NONE; + prepare_state = SEF_LU_STATE_NULL; if(rp->r_old_rp) { old_endpoint = rp->r_upd.state_endpoint; + prepare_state = rp->r_upd.prepare_state; } else if(rp->r_prev_rp) { old_endpoint = rp->r_prev_rp->r_pub->endpoint; @@ -53,6 +55,7 @@ int init_service(struct rproc *rp, int type, int flags) m.m_rs_init.restarts = (short) rp->r_restarts+1; m.m_rs_init.buff_addr = rp->r_map_prealloc_addr; m.m_rs_init.buff_len = rp->r_map_prealloc_len; + m.m_rs_init.prepare_state = prepare_state; rp->r_map_prealloc_addr = 0; rp->r_map_prealloc_len = 0; r = rs_asynsend(rp, &m, 0); diff --git a/minix/servers/vfs/main.c b/minix/servers/vfs/main.c index 68db53b6e..9c70e952e 100644 --- a/minix/servers/vfs/main.c +++ b/minix/servers/vfs/main.c @@ -47,6 +47,7 @@ static int unblock(struct fproc *rfp); /* SEF functions and variables. */ static void sef_local_startup(void); static int sef_cb_init_fresh(int type, sef_init_info_t *info); +static int sef_cb_init_lu(int type, sef_init_info_t *info); /*===========================================================================* * main * @@ -67,8 +68,8 @@ int main(void) /* This is the main loop that gets work, processes it, and sends replies. */ while (TRUE) { - yield_all(); /* let other threads run */ - self = NULL; + worker_yield(); /* let other threads run */ + send_work(); /* The get_work() function returns TRUE if we have a new message to @@ -280,15 +281,92 @@ static void do_work(void) if (error != SUSPEND) reply(&job_m_out, fp->fp_endpoint, error); } +/*===========================================================================* + * sef_cb_lu_prepare * + *===========================================================================*/ +static int sef_cb_lu_prepare(int state) +{ +/* This function is called to decide whether we can enter the given live + * update state, and to prepare for such an update. If we are requested to + * update to a request-free or protocol-free state, make sure there is no work + * pending or being processed, and shut down all worker threads. + */ + + switch (state) { + case SEF_LU_STATE_REQUEST_FREE: + case SEF_LU_STATE_PROTOCOL_FREE: + if (!worker_idle()) { + printf("VFS: worker threads not idle, blocking update\n"); + break; + } + + worker_cleanup(); + + return OK; + } + + return ENOTREADY; +} + +/*===========================================================================* + * sef_cb_lu_state_changed * + *===========================================================================*/ +static void sef_cb_lu_state_changed(int old_state, int state) +{ +/* Worker threads (especially their stacks) pose a serious problem for state + * transfer during live update, and therefore, we shut down all worker threads + * during live update and restart them afterwards. This function is called in + * the old VFS instance when the state changed. We use it to restart worker + * threads after a failed live update. + */ + + if (state != SEF_LU_STATE_NULL) + return; + + switch (old_state) { + case SEF_LU_STATE_REQUEST_FREE: + case SEF_LU_STATE_PROTOCOL_FREE: + worker_init(); + } +} + +/*===========================================================================* + * sef_cb_init_lu * + *===========================================================================*/ +static int sef_cb_init_lu(int type, sef_init_info_t *info) +{ +/* This function is called in the new VFS instance during a live update. */ + int r; + + /* Perform regular state transfer. */ + if ((r = SEF_CB_INIT_LU_DEFAULT(type, info)) != OK) + return r; + + /* Recreate worker threads, if necessary. */ + switch (info->prepare_state) { + case SEF_LU_STATE_REQUEST_FREE: + case SEF_LU_STATE_PROTOCOL_FREE: + worker_init(); + } + + return OK; +} + /*===========================================================================* * sef_local_startup * *===========================================================================*/ -static void sef_local_startup() +static void sef_local_startup(void) { /* Register init callbacks. */ sef_setcb_init_fresh(sef_cb_init_fresh); sef_setcb_init_restart(SEF_CB_INIT_RESTART_STATEFUL); + /* Register live update callbacks. */ + sef_setcb_init_lu(sef_cb_init_lu); + sef_setcb_lu_prepare(sef_cb_lu_prepare); + sef_setcb_lu_state_changed(sef_cb_lu_state_changed); + sef_setcb_lu_state_isvalid(sef_cb_lu_state_isvalid_standard); + /* Let SEF perform startup. */ sef_startup(); } diff --git a/minix/servers/vfs/proto.h b/minix/servers/vfs/proto.h index aeddf4646..27c6c083a 100644 --- a/minix/servers/vfs/proto.h +++ b/minix/servers/vfs/proto.h @@ -335,6 +335,8 @@ void select_unsuspend_by_endpt(endpoint_t proc); /* worker.c */ void worker_init(void); +void worker_cleanup(void); +int worker_idle(void); int worker_available(void); void worker_allow(int allow); struct worker_thread *worker_get(thread_t worker_tid); @@ -344,6 +346,7 @@ void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, int use_spare); void worker_stop(struct worker_thread *worker); void worker_stop_by_endpt(endpoint_t proc_e); +void worker_yield(void); void worker_wait(void); struct worker_thread *worker_suspend(void); void worker_resume(struct worker_thread *org_self); diff --git a/minix/servers/vfs/threads.h b/minix/servers/vfs/threads.h index b4a30689d..681d7bf77 100644 --- a/minix/servers/vfs/threads.h +++ b/minix/servers/vfs/threads.h @@ -7,9 +7,6 @@ #define cond_t mthread_cond_t #define attr_t mthread_attr_t -#define yield mthread_yield -#define yield_all mthread_yield_all - #define mutex_init mthread_mutex_init #define mutex_destroy mthread_mutex_destroy #define mutex_lock mthread_mutex_lock diff --git a/minix/servers/vfs/worker.c b/minix/servers/vfs/worker.c index e942a5a02..e85d0d2be 100644 --- a/minix/servers/vfs/worker.c +++ b/minix/servers/vfs/worker.c @@ -1,7 +1,7 @@ #include "fs.h" +#include #include -static void worker_get_work(void); static void *worker_main(void *arg); static void worker_sleep(void); static void worker_wake(struct worker_thread *worker); @@ -24,7 +24,7 @@ static int block_all; *===========================================================================*/ void worker_init(void) { -/* Initialize worker thread */ +/* Initialize worker threads */ struct worker_thread *wp; int i; @@ -32,8 +32,7 @@ void worker_init(void) panic("failed to initialize attribute"); if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0) panic("couldn't set default thread stack size"); - if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0) - panic("couldn't set default thread detach state"); + pending = 0; busy = 0; block_all = FALSE; @@ -47,13 +46,69 @@ void worker_init(void) if (mutex_init(&wp->w_event_mutex, NULL) != 0) panic("failed to initialize mutex"); if (cond_init(&wp->w_event, NULL) != 0) - panic("failed to initialize conditional variable"); + panic("failed to initialize condition variable"); if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0) panic("unable to start thread"); } /* Let all threads get ready to accept work. */ - yield_all(); + worker_yield(); +} + +/*===========================================================================* + * worker_cleanup * + *===========================================================================*/ +void worker_cleanup(void) +{ +/* Clean up worker threads, reversing the actions of worker_init() such that + * we can safely call worker_init() again later. All worker threads are + * expected to be idle already. Used for live updates, because transferring + * the thread stacks from one version to another is currently not feasible. + */ + struct worker_thread *wp; + int i; + + assert(worker_idle()); + + /* First terminate all threads. */ + for (i = 0; i < NR_WTHREADS; i++) { + wp = &workers[i]; + + assert(wp->w_fp == NULL); + + /* Waking up the thread with no w_fp will cause it to exit. */ + worker_wake(wp); + } + + worker_yield(); + + /* Then clean up their resources. */ + for (i = 0; i < NR_WTHREADS; i++) { + wp = &workers[i]; + + if (mthread_join(wp->w_tid, NULL) != 0) + panic("worker_cleanup: could not join thread %d", i); + if (cond_destroy(&wp->w_event) != 0) + panic("failed to destroy condition variable"); + if (mutex_destroy(&wp->w_event_mutex) != 0) + panic("failed to destroy mutex"); + } + + /* Finally, clean up global resources. */ + if (mthread_attr_destroy(&tattr) != 0) + panic("failed to destroy attribute"); + + memset(workers, 0, sizeof(workers)); +} + +/*===========================================================================* + * worker_idle * + *===========================================================================*/ +int worker_idle(void) +{ +/* Return whether all worker threads are idle. */ + + return (pending == 0 && busy == 0); } /*===========================================================================* @@ -132,10 +187,11 @@ void worker_allow(int allow) /*===========================================================================* * worker_get_work * *===========================================================================*/ -static void worker_get_work(void) +static int worker_get_work(void) { /* Find new work to do. Work can be 'queued', 'pending', or absent. In the - * latter case wait for new work to come in. + * latter case wait for new work to come in. Return TRUE if there is work to + * do, or FALSE if the current thread is requested to shut down. */ struct fproc *rfp; @@ -152,7 +208,7 @@ static void worker_get_work(void) rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ assert(pending > 0); pending--; - return; + return TRUE; } } panic("Pending work inconsistency"); @@ -160,6 +216,8 @@ static void worker_get_work(void) /* Wait for work to come to us */ worker_sleep(); + + return (self->w_fp != NULL); } /*===========================================================================* @@ -183,8 +241,7 @@ static void *worker_main(void *arg) self = (struct worker_thread *) arg; ASSERTW(self); - while(TRUE) { - worker_get_work(); + while (worker_get_work()) { fp = self->w_fp; assert(fp->fp_worker == self); @@ -227,7 +284,7 @@ static void *worker_main(void *arg) busy--; } - return(NULL); /* Unreachable */ + return(NULL); } /*===========================================================================* @@ -367,6 +424,18 @@ void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, worker_try_activate(rfp, use_spare); } +/*===========================================================================* + * worker_yield * + *===========================================================================*/ +void worker_yield(void) +{ +/* Yield to all worker threads. To be called from the main thread only. */ + + mthread_yield_all(); + + self = NULL; +} + /*===========================================================================* * worker_sleep * *===========================================================================*/