int flags;
vir_bytes buff_addr;
size_t buff_len;
- uint8_t padding[24];
+ int prepare_state;
+ uint8_t padding[20];
} mess_rs_init;
_ASSERT_MSG_SIZE(mess_rs_init);
void* init_buff_cleanup_start;
size_t init_buff_len;
int copy_flags;
+ int prepare_state;
} sef_init_info_t;
/* Callback type definitions. */
info.init_buff_start = (void*) m_ptr->m_rs_init.buff_addr;
info.init_buff_cleanup_start = info.init_buff_start;
info.init_buff_len = m_ptr->m_rs_init.buff_len;
+ info.prepare_state = m_ptr->m_rs_init.prepare_state;
/* Peform initialization. */
r = process_init(type, &info);
*===========================================================================*/
int init_service(struct rproc *rp, int type, int flags)
{
- int r;
+ int r, prepare_state;
message m;
endpoint_t old_endpoint;
/* Determine the old endpoint if this is a new instance. */
old_endpoint = NONE;
+ prepare_state = SEF_LU_STATE_NULL;
if(rp->r_old_rp) {
old_endpoint = rp->r_upd.state_endpoint;
+ prepare_state = rp->r_upd.prepare_state;
}
else if(rp->r_prev_rp) {
old_endpoint = rp->r_prev_rp->r_pub->endpoint;
m.m_rs_init.restarts = (short) rp->r_restarts+1;
m.m_rs_init.buff_addr = rp->r_map_prealloc_addr;
m.m_rs_init.buff_len = rp->r_map_prealloc_len;
+ m.m_rs_init.prepare_state = prepare_state;
rp->r_map_prealloc_addr = 0;
rp->r_map_prealloc_len = 0;
r = rs_asynsend(rp, &m, 0);
/* SEF functions and variables. */
static void sef_local_startup(void);
static int sef_cb_init_fresh(int type, sef_init_info_t *info);
+static int sef_cb_init_lu(int type, sef_init_info_t *info);
/*===========================================================================*
* main *
/* This is the main loop that gets work, processes it, and sends replies. */
while (TRUE) {
- yield_all(); /* let other threads run */
- self = NULL;
+ worker_yield(); /* let other threads run */
+
send_work();
/* The get_work() function returns TRUE if we have a new message to
if (error != SUSPEND) reply(&job_m_out, fp->fp_endpoint, error);
}
+/*===========================================================================*
+ * sef_cb_lu_prepare *
+ *===========================================================================*/
+static int sef_cb_lu_prepare(int state)
+{
+/* This function is called to decide whether we can enter the given live
+ * update state, and to prepare for such an update. If we are requested to
+ * update to a request-free or protocol-free state, make sure there is no work
+ * pending or being processed, and shut down all worker threads.
+ */
+
+ switch (state) {
+ case SEF_LU_STATE_REQUEST_FREE:
+ case SEF_LU_STATE_PROTOCOL_FREE:
+ if (!worker_idle()) {
+ printf("VFS: worker threads not idle, blocking update\n");
+ break;
+ }
+
+ worker_cleanup();
+
+ return OK;
+ }
+
+ return ENOTREADY;
+}
+
+/*===========================================================================*
+ * sef_cb_lu_state_changed *
+ *===========================================================================*/
+static void sef_cb_lu_state_changed(int old_state, int state)
+{
+/* Worker threads (especially their stacks) pose a serious problem for state
+ * transfer during live update, and therefore, we shut down all worker threads
+ * during live update and restart them afterwards. This function is called in
+ * the old VFS instance when the state changed. We use it to restart worker
+ * threads after a failed live update.
+ */
+
+ if (state != SEF_LU_STATE_NULL)
+ return;
+
+ switch (old_state) {
+ case SEF_LU_STATE_REQUEST_FREE:
+ case SEF_LU_STATE_PROTOCOL_FREE:
+ worker_init();
+ }
+}
+
+/*===========================================================================*
+ * sef_cb_init_lu *
+ *===========================================================================*/
+static int sef_cb_init_lu(int type, sef_init_info_t *info)
+{
+/* This function is called in the new VFS instance during a live update. */
+ int r;
+
+ /* Perform regular state transfer. */
+ if ((r = SEF_CB_INIT_LU_DEFAULT(type, info)) != OK)
+ return r;
+
+ /* Recreate worker threads, if necessary. */
+ switch (info->prepare_state) {
+ case SEF_LU_STATE_REQUEST_FREE:
+ case SEF_LU_STATE_PROTOCOL_FREE:
+ worker_init();
+ }
+
+ return OK;
+}
+
/*===========================================================================*
* sef_local_startup *
*===========================================================================*/
-static void sef_local_startup()
+static void sef_local_startup(void)
{
/* Register init callbacks. */
sef_setcb_init_fresh(sef_cb_init_fresh);
sef_setcb_init_restart(SEF_CB_INIT_RESTART_STATEFUL);
+ /* Register live update callbacks. */
+ sef_setcb_init_lu(sef_cb_init_lu);
+ sef_setcb_lu_prepare(sef_cb_lu_prepare);
+ sef_setcb_lu_state_changed(sef_cb_lu_state_changed);
+ sef_setcb_lu_state_isvalid(sef_cb_lu_state_isvalid_standard);
+
/* Let SEF perform startup. */
sef_startup();
}
/* worker.c */
void worker_init(void);
+void worker_cleanup(void);
+int worker_idle(void);
int worker_available(void);
void worker_allow(int allow);
struct worker_thread *worker_get(thread_t worker_tid);
int use_spare);
void worker_stop(struct worker_thread *worker);
void worker_stop_by_endpt(endpoint_t proc_e);
+void worker_yield(void);
void worker_wait(void);
struct worker_thread *worker_suspend(void);
void worker_resume(struct worker_thread *org_self);
#define cond_t mthread_cond_t
#define attr_t mthread_attr_t
-#define yield mthread_yield
-#define yield_all mthread_yield_all
-
#define mutex_init mthread_mutex_init
#define mutex_destroy mthread_mutex_destroy
#define mutex_lock mthread_mutex_lock
#include "fs.h"
+#include <string.h>
#include <assert.h>
-static void worker_get_work(void);
static void *worker_main(void *arg);
static void worker_sleep(void);
static void worker_wake(struct worker_thread *worker);
*===========================================================================*/
void worker_init(void)
{
-/* Initialize worker thread */
+/* Initialize worker threads */
struct worker_thread *wp;
int i;
panic("failed to initialize attribute");
if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
panic("couldn't set default thread stack size");
- if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
- panic("couldn't set default thread detach state");
+
pending = 0;
busy = 0;
block_all = FALSE;
if (mutex_init(&wp->w_event_mutex, NULL) != 0)
panic("failed to initialize mutex");
if (cond_init(&wp->w_event, NULL) != 0)
- panic("failed to initialize conditional variable");
+ panic("failed to initialize condition variable");
if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
panic("unable to start thread");
}
/* Let all threads get ready to accept work. */
- yield_all();
+ worker_yield();
+}
+
+/*===========================================================================*
+ * worker_cleanup *
+ *===========================================================================*/
+void worker_cleanup(void)
+{
+/* Clean up worker threads, reversing the actions of worker_init() such that
+ * we can safely call worker_init() again later. All worker threads are
+ * expected to be idle already. Used for live updates, because transferring
+ * the thread stacks from one version to another is currently not feasible.
+ */
+ struct worker_thread *wp;
+ int i;
+
+ assert(worker_idle());
+
+ /* First terminate all threads. */
+ for (i = 0; i < NR_WTHREADS; i++) {
+ wp = &workers[i];
+
+ assert(wp->w_fp == NULL);
+
+ /* Waking up the thread with no w_fp will cause it to exit. */
+ worker_wake(wp);
+ }
+
+ worker_yield();
+
+ /* Then clean up their resources. */
+ for (i = 0; i < NR_WTHREADS; i++) {
+ wp = &workers[i];
+
+ if (mthread_join(wp->w_tid, NULL) != 0)
+ panic("worker_cleanup: could not join thread %d", i);
+ if (cond_destroy(&wp->w_event) != 0)
+ panic("failed to destroy condition variable");
+ if (mutex_destroy(&wp->w_event_mutex) != 0)
+ panic("failed to destroy mutex");
+ }
+
+ /* Finally, clean up global resources. */
+ if (mthread_attr_destroy(&tattr) != 0)
+ panic("failed to destroy attribute");
+
+ memset(workers, 0, sizeof(workers));
+}
+
+/*===========================================================================*
+ * worker_idle *
+ *===========================================================================*/
+int worker_idle(void)
+{
+/* Return whether all worker threads are idle. */
+
+ return (pending == 0 && busy == 0);
}
/*===========================================================================*
/*===========================================================================*
* worker_get_work *
*===========================================================================*/
-static void worker_get_work(void)
+static int worker_get_work(void)
{
/* Find new work to do. Work can be 'queued', 'pending', or absent. In the
- * latter case wait for new work to come in.
+ * latter case wait for new work to come in. Return TRUE if there is work to
+ * do, or FALSE if the current thread is requested to shut down.
*/
struct fproc *rfp;
rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
assert(pending > 0);
pending--;
- return;
+ return TRUE;
}
}
panic("Pending work inconsistency");
/* Wait for work to come to us */
worker_sleep();
+
+ return (self->w_fp != NULL);
}
/*===========================================================================*
self = (struct worker_thread *) arg;
ASSERTW(self);
- while(TRUE) {
- worker_get_work();
+ while (worker_get_work()) {
fp = self->w_fp;
assert(fp->fp_worker == self);
busy--;
}
- return(NULL); /* Unreachable */
+ return(NULL);
}
/*===========================================================================*
worker_try_activate(rfp, use_spare);
}
+/*===========================================================================*
+ * worker_yield *
+ *===========================================================================*/
+void worker_yield(void)
+{
+/* Yield to all worker threads. To be called from the main thread only. */
+
+ mthread_yield_all();
+
+ self = NULL;
+}
+
/*===========================================================================*
* worker_sleep *
*===========================================================================*/