X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Frecov_thread.c;h=dee77cdc349cba6291b6bb4c7d544b5199c082cf;hp=c10331f8ddb830cbe0a17711ebf05e776f5566cb;hb=9fb5d1ff837880a7596d98f24ff6bb1bf0033f2b;hpb=e3a7c58aebafce40323db54bf6056029e5af4a70 diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index c10331f..dee77cd 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -27,7 +27,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -47,10 +47,6 @@ #define DEBUG_SUBSYSTEM S_LOG -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #ifdef __KERNEL__ # include #else @@ -108,7 +104,7 @@ static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm) */ size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); overhead = offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, size + overhead); + OBD_SLAB_ALLOC_GFP(llcd, llcd_cache, size + overhead, CFS_ALLOC_STD); if (!llcd) return NULL; @@ -116,11 +112,11 @@ static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm) llcd->llcd_cookiebytes = 0; llcd->llcd_size = size; - cfs_spin_lock(&lcm->lcm_lock); - llcd->llcd_lcm = lcm; - cfs_atomic_inc(&lcm->lcm_count); - cfs_list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); - cfs_spin_unlock(&lcm->lcm_lock); + spin_lock(&lcm->lcm_lock); + llcd->llcd_lcm = lcm; + cfs_atomic_inc(&lcm->lcm_count); + cfs_list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); + spin_unlock(&lcm->lcm_lock); cfs_atomic_inc(&llcd_count); CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n", @@ -143,11 +139,11 @@ static void llcd_free(struct llog_canceld_ctxt *llcd) llcd_print(llcd, __FUNCTION__, __LINE__); LBUG(); } - cfs_spin_lock(&lcm->lcm_lock); - LASSERT(!cfs_list_empty(&llcd->llcd_list)); - cfs_list_del_init(&llcd->llcd_list); - cfs_atomic_dec(&lcm->lcm_count); - cfs_spin_unlock(&lcm->lcm_lock); + spin_lock(&lcm->lcm_lock); + LASSERT(!cfs_list_empty(&llcd->llcd_list)); + cfs_list_del_init(&llcd->llcd_list); + cfs_atomic_dec(&lcm->lcm_count); + spin_unlock(&lcm->lcm_lock); CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", llcd, lcm, cfs_atomic_read(&lcm->lcm_count)); @@ -234,7 +230,7 @@ static int llcd_send(struct llog_canceld_ctxt *llcd) * Check if we're in exit stage. Do not send llcd in * this case. */ - if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) GOTO(exit, rc = -ENODEV); CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd); @@ -422,13 +418,14 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) * Let all know that we're stopping. This will also make * llcd_send() refuse any new llcds. */ - cfs_set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags); + set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags); /* * Stop processing thread. No new rpcs will be accepted for * for processing now. */ ptlrpcd_stop(&lcm->lcm_pc, force); + ptlrpcd_free(&lcm->lcm_pc); /* * By this point no alive inflight llcds should be left. Only @@ -442,13 +439,13 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) CERROR("Busy llcds found (%d) on lcm %p\n", cfs_atomic_read(&lcm->lcm_count), lcm); - cfs_spin_lock(&lcm->lcm_lock); - cfs_list_for_each(tmp, &lcm->lcm_llcds) { - llcd = cfs_list_entry(tmp, struct llog_canceld_ctxt, - llcd_list); - llcd_print(llcd, __FUNCTION__, __LINE__); - } - cfs_spin_unlock(&lcm->lcm_lock); + spin_lock(&lcm->lcm_lock); + cfs_list_for_each(tmp, &lcm->lcm_llcds) { + llcd = cfs_list_entry(tmp, struct llog_canceld_ctxt, + llcd_list); + llcd_print(llcd, __func__, __LINE__); + } + spin_unlock(&lcm->lcm_lock); /* * No point to go further with busy llcds at this point @@ -487,7 +484,7 @@ struct llog_commit_master *llog_recov_thread_init(char *name) cfs_atomic_set(&lcm->lcm_count, 0); cfs_atomic_set(&lcm->lcm_refcount, 1); - cfs_spin_lock_init(&lcm->lcm_lock); + spin_lock_init(&lcm->lcm_lock); CFS_INIT_LIST_HEAD(&lcm->lcm_llcds); rc = llog_recov_thread_start(lcm); if (rc) { @@ -566,15 +563,15 @@ int llog_obd_repl_connect(struct llog_ctxt *ctxt, * Send back cached llcd from llog before recovery if we have any. * This is void is nothing cached is found there. */ - llog_sync(ctxt, NULL); + llog_sync(ctxt, NULL, 0); /* * Start recovery in separate thread. */ - cfs_mutex_lock(&ctxt->loc_mutex); + mutex_lock(&ctxt->loc_mutex); ctxt->loc_gen = *gen; rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid); - cfs_mutex_unlock(&ctxt->loc_mutex); + mutex_unlock(&ctxt->loc_mutex); RETURN(rc); } @@ -585,9 +582,9 @@ EXPORT_SYMBOL(llog_obd_repl_connect); * log record for the deletion. The commit callback calls this * function. */ -int llog_obd_repl_cancel(struct llog_ctxt *ctxt, - struct lov_stripe_md *lsm, int count, - struct llog_cookie *cookies, int flags) +int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, + struct lov_stripe_md *lsm, int count, + struct llog_cookie *cookies, int flags) { struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; @@ -596,7 +593,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, LASSERT(ctxt != NULL); - cfs_mutex_lock(&ctxt->loc_mutex); + mutex_lock(&ctxt->loc_mutex); if (!ctxt->loc_lcm) { CDEBUG(D_RPCTRACE, "No lcm for ctxt %p\n", ctxt); GOTO(out, rc = -ENODEV); @@ -608,12 +605,17 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, * Let's check if we have all structures alive. We also check for * possible shutdown. Do nothing if we're stopping. */ - if (ctxt->loc_imp == NULL) { + if (ctxt->loc_flags & LLOG_CTXT_FLAG_STOP) { + CDEBUG(D_RPCTRACE, "Last sync was done for ctxt %p\n", ctxt); + GOTO(out, rc = -ENODEV); + } + + if (ctxt->loc_imp == NULL) { CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt); GOTO(out, rc = -ENODEV); } - if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) { + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) { CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", ctxt); GOTO(out, rc = -ENODEV); @@ -633,7 +635,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, * Allocation is successful, let's check for stop * flag again to fall back as soon as possible. */ - if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) GOTO(out, rc = -ENODEV); } @@ -652,7 +654,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, * Allocation is successful, let's check for stop * flag again to fall back as soon as possible. */ - if (cfs_test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) GOTO(out, rc = -ENODEV); } @@ -677,12 +679,17 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, out: if (rc) llcd_put(ctxt); - cfs_mutex_unlock(&ctxt->loc_mutex); + + if (flags & OBD_LLOG_FL_EXIT) + ctxt->loc_flags = LLOG_CTXT_FLAG_STOP; + + mutex_unlock(&ctxt->loc_mutex); return rc; } EXPORT_SYMBOL(llog_obd_repl_cancel); -int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) +int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp, + int flags) { int rc = 0; ENTRY; @@ -690,7 +697,7 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) /* * Flush any remaining llcd. */ - cfs_mutex_lock(&ctxt->loc_mutex); + mutex_lock(&ctxt->loc_mutex); if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { /* * This is ost->mds connection, we can't be sure that mds @@ -698,7 +705,11 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) */ CDEBUG(D_RPCTRACE, "Kill cached llcd\n"); llcd_put(ctxt); - cfs_mutex_unlock(&ctxt->loc_mutex); + + if (flags & OBD_LLOG_FL_EXIT) + ctxt->loc_flags = LLOG_CTXT_FLAG_STOP; + + mutex_unlock(&ctxt->loc_mutex); } else { /* * This is either llog_sync() from generic llog code or sync @@ -706,8 +717,9 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) * llcds to the target with waiting for completion. */ CDEBUG(D_RPCTRACE, "Sync cached llcd\n"); - cfs_mutex_unlock(&ctxt->loc_mutex); - rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); + mutex_unlock(&ctxt->loc_mutex); + rc = llog_cancel(NULL, ctxt, NULL, 0, NULL, + OBD_LLOG_FL_SENDNOW | flags); } RETURN(rc); } @@ -715,7 +727,7 @@ EXPORT_SYMBOL(llog_obd_repl_sync); #else /* !__KERNEL__ */ -int llog_obd_repl_cancel(struct llog_ctxt *ctxt, +int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) {