X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Frecov_thread.c;h=e90142b090002bdfb412d20159e4b70d5b4b8c5f;hb=da0dca9d54757963322ba3458a21c54800e36571;hp=6fa95a708ebb3e18ce49941b13f37c8438f9bb07;hpb=00ab5a4e3a300c2d1b64be54780d265cb1b13b97;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 6fa95a70..e90142b 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -76,29 +76,52 @@ enum { LLOG_LCM_FL_EXIT = 1 << 1 }; +static void llcd_print(struct llog_canceld_ctxt *llcd, + const char *func, int line) +{ + CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); + CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); + CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); + CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); + CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); +} + /** * Allocate new llcd from cache, init it and return to caller. * Bumps number of objects allocated. */ -static struct llog_canceld_ctxt *llcd_alloc(void) +static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm) { struct llog_canceld_ctxt *llcd; - int llcd_size; + int size, overhead; + + LASSERT(lcm != NULL); /* - * Payload of lustre_msg V2 is bigger. + * We want to send one page of cookies with rpc header. This buffer + * will be assigned later to the rpc, this is why we preserve the + * space for rpc header. */ - llcd_size = CFS_PAGE_SIZE - - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); - llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, llcd_size); + size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); + overhead = offsetof(struct llog_canceld_ctxt, llcd_cookies); + OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, size + overhead); if (!llcd) return NULL; CFS_INIT_LIST_HEAD(&llcd->llcd_list); - llcd->llcd_size = llcd_size; llcd->llcd_cookiebytes = 0; + llcd->llcd_size = size; + + spin_lock(&lcm->lcm_lock); + llcd->llcd_lcm = lcm; + atomic_inc(&lcm->lcm_count); + list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); + spin_unlock(&lcm->lcm_lock); atomic_inc(&llcd_count); + + CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n", + llcd, lcm, atomic_read(&lcm->lcm_count)); + return llcd; } @@ -107,41 +130,53 @@ static struct llog_canceld_ctxt *llcd_alloc(void) */ static void llcd_free(struct llog_canceld_ctxt *llcd) { + struct llog_commit_master *lcm = llcd->llcd_lcm; + int size; + + if (lcm) { + if (atomic_read(&lcm->lcm_count) == 0) { + CERROR("Invalid llcd free %p\n", llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + spin_lock(&lcm->lcm_lock); + LASSERT(!list_empty(&llcd->llcd_list)); + list_del_init(&llcd->llcd_list); + atomic_dec(&lcm->lcm_count); + spin_unlock(&lcm->lcm_lock); + + CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", + llcd, lcm, atomic_read(&lcm->lcm_count)); + } + LASSERT(atomic_read(&llcd_count) > 0); - OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); atomic_dec(&llcd_count); -} -/** - * Copy passed @cookies to @llcd. - */ -static void llcd_copy(struct llog_canceld_ctxt *llcd, - struct llog_cookie *cookies) -{ - memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, - cookies, sizeof(*cookies)); - llcd->llcd_cookiebytes += sizeof(*cookies); + size = offsetof(struct llog_canceld_ctxt, llcd_cookies) + + llcd->llcd_size; + OBD_SLAB_FREE(llcd, llcd_cache, size); } /** * Checks if passed cookie fits into llcd free space buffer. Returns * 1 if yes and 0 otherwise. */ -static int llcd_fit(struct llog_canceld_ctxt *llcd, - struct llog_cookie *cookies) +static inline int +llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies) { - return (llcd->llcd_size - - llcd->llcd_cookiebytes) >= sizeof(*cookies); + return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies)); } -static void llcd_print(struct llog_canceld_ctxt *llcd, - const char *func, int line) +/** + * Copy passed @cookies to @llcd. + */ +static inline void +llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies) { - CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); - CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); - CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); - CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); - CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); + LASSERT(llcd_fit(llcd, cookies)); + memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, + cookies, sizeof(*cookies)); + llcd->llcd_cookiebytes += sizeof(*cookies); } /** @@ -154,7 +189,7 @@ llcd_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *noused, int rc) { struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0]; - CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc); + CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc); llcd_free(llcd); return 0; } @@ -235,6 +270,11 @@ static int llcd_send(struct llog_canceld_ctxt *llcd) req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret; req->rq_async_args.pointer_arg[0] = llcd; + + /* llog cancels will be replayed after reconnect so this will do twice + * first from replay llog, second for resended rpc */ + req->rq_no_delay = req->rq_no_resend = 1; + rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req); if (rc) { ptlrpc_request_free(req); @@ -254,21 +294,15 @@ exit: static int llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) { - struct llog_commit_master *lcm; - LASSERT(ctxt != NULL && llcd != NULL); LASSERT_SEM_LOCKED(&ctxt->loc_sem); LASSERT(ctxt->loc_llcd == NULL); - lcm = ctxt->loc_lcm; - spin_lock(&lcm->lcm_lock); - atomic_inc(&lcm->lcm_count); - list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); - spin_unlock(&lcm->lcm_lock); - CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n", - llcd, ctxt, atomic_read(&lcm->lcm_count)); llcd->llcd_ctxt = llog_ctxt_get(ctxt); - llcd->llcd_lcm = ctxt->loc_lcm; ctxt->loc_llcd = llcd; + + CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n", + llcd, ctxt); + return 0; } @@ -278,7 +312,6 @@ llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) */ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; LASSERT(ctxt != NULL); @@ -288,22 +321,10 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) if (!llcd) return NULL; - lcm = ctxt->loc_lcm; - if (atomic_read(&lcm->lcm_count) == 0) { - CERROR("Invalid detach occured %p:%p\n", ctxt, llcd); - llcd_print(llcd, __FUNCTION__, __LINE__); - LBUG(); - } - spin_lock(&lcm->lcm_lock); - LASSERT(!list_empty(&llcd->llcd_list)); - list_del_init(&llcd->llcd_list); - atomic_dec(&lcm->lcm_count); - spin_unlock(&lcm->lcm_lock); - ctxt->loc_llcd = NULL; - - CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", - llcd, ctxt, atomic_read(&lcm->lcm_count)); + CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n", + llcd, ctxt); + ctxt->loc_llcd = NULL; llog_ctxt_put(ctxt); return llcd; } @@ -316,9 +337,9 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) { struct llog_canceld_ctxt *llcd; - llcd = llcd_alloc(); + llcd = llcd_alloc(ctxt->loc_lcm); if (!llcd) { - CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt); + CERROR("Can't alloc an llcd for ctxt %p\n", ctxt); return NULL; } llcd_attach(ctxt, llcd); @@ -330,10 +351,8 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) */ static void llcd_put(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; - lcm = ctxt->loc_lcm; llcd = llcd_detach(ctxt); if (llcd) llcd_free(llcd); @@ -423,6 +442,18 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) llcd_print(llcd, __FUNCTION__, __LINE__); } spin_unlock(&lcm->lcm_lock); + + /* + * No point to go further with busy llcds at this point + * as this is clear bug. It might mean we got hanging + * rpc which holds import ref and this means we will not + * be able to cleanup anyways. + * + * Or we just missed to kill them when they were not + * attached to ctxt. In this case our slab will remind + * us about this a bit later. + */ + LBUG(); } EXIT; } @@ -447,7 +478,6 @@ struct llog_commit_master *llog_recov_thread_init(char *name) snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), "ll_log_commit_%s", name); - strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name)); atomic_set(&lcm->lcm_count, 0); spin_lock_init(&lcm->lcm_lock); CFS_INIT_LIST_HEAD(&lcm->lcm_llcds); @@ -561,6 +591,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, mutex_down(&ctxt->loc_sem); lcm = ctxt->loc_lcm; + CDEBUG(D_INFO, "cancel on lsm %p\n", lcm); /* * Let's check if we have all structures alive. We also check for @@ -626,6 +657,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, * then do it. */ if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) { + CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd); rc = llcd_push(ctxt); if (rc) GOTO(out, rc);