From b2f366a2212bb5f87fb1ada932c4082fdfd77931 Mon Sep 17 00:00:00 2001 From: yury Date: Wed, 19 Nov 2008 14:18:37 +0000 Subject: [PATCH] b=17686 r=shadow,panda - fixes race in ptlrpcd caused busy obd and inability to cleanup; - cleanups and debugs in llcd code. --- lustre/include/lustre_net.h | 9 ++-- lustre/obdclass/genops.c | 12 ++--- lustre/ptlrpc/client.c | 12 ++--- lustre/ptlrpc/ptlrpcd.c | 26 +++++----- lustre/ptlrpc/recov_thread.c | 110 ++++++++++++++++++++++++++----------------- 5 files changed, 98 insertions(+), 71 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 1bb9ac2..9d7f95b 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -751,7 +751,7 @@ struct ptlrpc_service { struct ptlrpcd_ctl { /** - * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_STOP_FORCE) + * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_FORCE) */ unsigned long pc_flags; /** @@ -810,10 +810,11 @@ enum ptlrpcd_ctl_flags { */ LIOD_STOP = 1 << 1, /** - * Ptlrpc thread stop force flag. This will cause also - * aborting any inflight rpcs handled by thread. + * Ptlrpc thread force flag (only stop force so far). + * This will cause aborting any inflight rpcs handled + * by thread if LIOD_STOP is specified. */ - LIOD_STOP_FORCE = 1 << 2, + LIOD_FORCE = 1 << 2, /** * This is a recovery ptlrpc thread. */ diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 854b5c6..edd6dfb 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -813,8 +813,9 @@ struct obd_import *class_import_get(struct obd_import *import) LASSERT(atomic_read(&import->imp_refcount) >= 0); LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); atomic_inc(&import->imp_refcount); - CDEBUG(D_INFO, "import %p refcount=%d\n", import, - atomic_read(&import->imp_refcount)); + CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import, + atomic_read(&import->imp_refcount), + import->imp_obd->obd_name); return import; } EXPORT_SYMBOL(class_import_get); @@ -827,13 +828,12 @@ void class_import_put(struct obd_import *import) LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); LASSERT(list_empty(&import->imp_zombie_chain)); - CDEBUG(D_INFO, "import %p refcount=%d\n", import, - atomic_read(&import->imp_refcount) - 1); + CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import, + atomic_read(&import->imp_refcount) - 1, + import->imp_obd->obd_name); if (atomic_dec_and_test(&import->imp_refcount)) { - CDEBUG(D_INFO, "final put import %p\n", import); - spin_lock(&obd_zombie_impexp_lock); list_add(&import->imp_zombie_chain, &obd_zombie_imports); spin_unlock(&obd_zombie_impexp_lock); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 165dd29..000904c 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -2445,24 +2445,24 @@ void ptlrpc_abort_inflight(struct obd_import *imp) void ptlrpc_abort_set(struct ptlrpc_request_set *set) { - struct list_head *tmp, *n; + struct list_head *tmp, *pos; LASSERT(set != NULL); - list_for_each_safe(tmp, n, &set->set_requests) { + list_for_each_safe(pos, tmp, &set->set_requests) { struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_set_chain); + list_entry(pos, struct ptlrpc_request, rq_set_chain); - spin_lock (&req->rq_lock); + spin_lock(&req->rq_lock); if (req->rq_phase != RQ_PHASE_RPC) { - spin_unlock (&req->rq_lock); + spin_unlock(&req->rq_lock); continue; } req->rq_err = 1; req->rq_status = -EINTR; ptlrpc_client_wake_req(req); - spin_unlock (&req->rq_lock); + spin_unlock(&req->rq_lock); } } diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 30c2823..a52e62b 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -146,9 +146,6 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc) int rc = 0; ENTRY; - if (test_bit(LIOD_STOP, &pc->pc_flags)) - RETURN(1); - spin_lock(&pc->pc_set->set_new_req_lock); list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); @@ -202,7 +199,7 @@ static int ptlrpcd(void *arg) { struct ptlrpcd_ctl *pc = arg; struct lu_env env = { .le_ses = NULL }; - int rc; + int rc, exit = 0; ENTRY; rc = cfs_daemonize_ctxt(pc->pc_name); @@ -221,13 +218,14 @@ static int ptlrpcd(void *arg) if (rc != 0) RETURN(rc); env.le_ctx.lc_cookie = 0x7; + /* * This mainloop strongly resembles ptlrpc_set_wait() except that our * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when * there are requests in the set. New requests come in on the set's * new_req_list and ptlrpcd_check() moves them into the set. */ - while (1) { + do { struct l_wait_info lwi; int timeout; @@ -259,12 +257,17 @@ static int ptlrpcd(void *arg) /* * Abort inflight rpcs for forced stop case. */ - if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags)) - ptlrpc_abort_set(pc->pc_set); + if (test_bit(LIOD_STOP, &pc->pc_flags)) { + if (test_bit(LIOD_FORCE, &pc->pc_flags)) + ptlrpc_abort_set(pc->pc_set); + exit++; + } - if (test_bit(LIOD_STOP, &pc->pc_flags)) - break; - } + /* + * Let's make one more loop to make sure that ptlrpcd_check() + * copied all raced new rpcs into the set so we can kill them. + */ + } while (exit < 2); /* * Wait for inflight requests to drain. @@ -276,6 +279,7 @@ static int ptlrpcd(void *arg) clear_bit(LIOD_START, &pc->pc_flags); clear_bit(LIOD_STOP, &pc->pc_flags); + clear_bit(LIOD_FORCE, &pc->pc_flags); return 0; } @@ -382,7 +386,7 @@ void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force) set_bit(LIOD_STOP, &pc->pc_flags); if (force) - set_bit(LIOD_STOP_FORCE, &pc->pc_flags); + set_bit(LIOD_FORCE, &pc->pc_flags); cfs_waitq_signal(&pc->pc_set->set_waitq); #ifdef __KERNEL__ wait_for_completion(&pc->pc_finishing); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 6fa95a70..1cb65fd 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -76,15 +76,27 @@ enum { LLOG_LCM_FL_EXIT = 1 << 1 }; +static void llcd_print(struct llog_canceld_ctxt *llcd, + const char *func, int line) +{ + CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); + CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); + CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); + CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); + CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); +} + /** * Allocate new llcd from cache, init it and return to caller. * Bumps number of objects allocated. */ -static struct llog_canceld_ctxt *llcd_alloc(void) +static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm) { struct llog_canceld_ctxt *llcd; int llcd_size; + LASSERT(lcm != NULL); + /* * Payload of lustre_msg V2 is bigger. */ @@ -98,7 +110,17 @@ static struct llog_canceld_ctxt *llcd_alloc(void) CFS_INIT_LIST_HEAD(&llcd->llcd_list); llcd->llcd_size = llcd_size; llcd->llcd_cookiebytes = 0; + + spin_lock(&lcm->lcm_lock); + llcd->llcd_lcm = lcm; + atomic_inc(&lcm->lcm_count); + list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); + spin_unlock(&lcm->lcm_lock); atomic_inc(&llcd_count); + + CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n", + llcd, lcm, atomic_read(&lcm->lcm_count)); + return llcd; } @@ -107,9 +129,27 @@ static struct llog_canceld_ctxt *llcd_alloc(void) */ static void llcd_free(struct llog_canceld_ctxt *llcd) { + struct llog_commit_master *lcm = llcd->llcd_lcm; + + if (lcm) { + if (atomic_read(&lcm->lcm_count) == 0) { + CERROR("Invalid llcd free %p\n", llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + spin_lock(&lcm->lcm_lock); + LASSERT(!list_empty(&llcd->llcd_list)); + list_del_init(&llcd->llcd_list); + atomic_dec(&lcm->lcm_count); + spin_unlock(&lcm->lcm_lock); + } + + CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", + llcd, lcm, atomic_read(&lcm->lcm_count)); + LASSERT(atomic_read(&llcd_count) > 0); - OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); atomic_dec(&llcd_count); + OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); } /** @@ -128,22 +168,12 @@ static void llcd_copy(struct llog_canceld_ctxt *llcd, * 1 if yes and 0 otherwise. */ static int llcd_fit(struct llog_canceld_ctxt *llcd, - struct llog_cookie *cookies) + struct llog_cookie *cookies) { return (llcd->llcd_size - llcd->llcd_cookiebytes) >= sizeof(*cookies); } -static void llcd_print(struct llog_canceld_ctxt *llcd, - const char *func, int line) -{ - CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); - CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); - CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); - CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); - CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); -} - /** * Llcd completion function. Called uppon llcd send finish regardless * sending result. Error is passed in @rc. Note, that this will be called @@ -154,7 +184,7 @@ llcd_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *noused, int rc) { struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0]; - CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc); + CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc); llcd_free(llcd); return 0; } @@ -254,21 +284,15 @@ exit: static int llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) { - struct llog_commit_master *lcm; - LASSERT(ctxt != NULL && llcd != NULL); LASSERT_SEM_LOCKED(&ctxt->loc_sem); LASSERT(ctxt->loc_llcd == NULL); - lcm = ctxt->loc_lcm; - spin_lock(&lcm->lcm_lock); - atomic_inc(&lcm->lcm_count); - list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); - spin_unlock(&lcm->lcm_lock); - CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n", - llcd, ctxt, atomic_read(&lcm->lcm_count)); llcd->llcd_ctxt = llog_ctxt_get(ctxt); - llcd->llcd_lcm = ctxt->loc_lcm; ctxt->loc_llcd = llcd; + + CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n", + llcd, ctxt); + return 0; } @@ -278,7 +302,6 @@ llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) */ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; LASSERT(ctxt != NULL); @@ -288,22 +311,10 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) if (!llcd) return NULL; - lcm = ctxt->loc_lcm; - if (atomic_read(&lcm->lcm_count) == 0) { - CERROR("Invalid detach occured %p:%p\n", ctxt, llcd); - llcd_print(llcd, __FUNCTION__, __LINE__); - LBUG(); - } - spin_lock(&lcm->lcm_lock); - LASSERT(!list_empty(&llcd->llcd_list)); - list_del_init(&llcd->llcd_list); - atomic_dec(&lcm->lcm_count); - spin_unlock(&lcm->lcm_lock); - ctxt->loc_llcd = NULL; - - CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", - llcd, ctxt, atomic_read(&lcm->lcm_count)); + CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n", + llcd, ctxt); + ctxt->loc_llcd = NULL; llog_ctxt_put(ctxt); return llcd; } @@ -316,9 +327,9 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) { struct llog_canceld_ctxt *llcd; - llcd = llcd_alloc(); + llcd = llcd_alloc(ctxt->loc_lcm); if (!llcd) { - CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt); + CERROR("Can't alloc an llcd for ctxt %p\n", ctxt); return NULL; } llcd_attach(ctxt, llcd); @@ -330,10 +341,8 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) */ static void llcd_put(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; - lcm = ctxt->loc_lcm; llcd = llcd_detach(ctxt); if (llcd) llcd_free(llcd); @@ -423,6 +432,18 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) llcd_print(llcd, __FUNCTION__, __LINE__); } spin_unlock(&lcm->lcm_lock); + + /* + * No point to go further with busy llcds at this point + * as this is clear bug. It might mean we got hanging + * rpc which holds import ref and this means we will not + * be able to cleanup anyways. + * + * Or we just missed to kill them when they were not + * attached to ctxt. In this case our slab will remind + * us about this a bit later. + */ + LBUG(); } EXIT; } @@ -626,6 +647,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, * then do it. */ if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) { + CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd); rc = llcd_push(ctxt); if (rc) GOTO(out, rc); -- 1.8.3.1