From 81be387c988787b86565f1e4087fd20809b6a7c3 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Tue, 25 Nov 2014 22:24:59 +0800 Subject: [PATCH] LU-5682 lfsck: optimize ldlm lock used by LFSCK When LFSCK repairs some inconsistency, it needs to take related ldlm lock(s) firstly to prevent concurrent modifications or purge client side cache. Originally, to simply the implementation, the LFSCK just simply acquires LCK_EX mode ibits lock(s) on related object(s). But such coarse-grained lock policy may be not efficient for some directory-based modification, such as insert name entry to the directory. This patch introduces lfsck PDO (Parallel Directory Operations) lock for directory-based LFSCK modification, it only locks part of the directory with the given pairs, then allow others to access or modify the different part(s) of the directory in parallel, and also avoid to purge client-side cache unnecessarily. Signed-off-by: Fan Yong Change-Id: I29bad81112c14e3aaecaa2b808e60ea74c10a702 Reviewed-on: http://review.whamcloud.com/12766 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/lfsck/lfsck_internal.h | 12 +++ lustre/lfsck/lfsck_layout.c | 66 ++++++++------- lustre/lfsck/lfsck_lib.c | 151 +++++++++++++++++++++++++++------- lustre/lfsck/lfsck_namespace.c | 171 ++++++++++++++++++++++++--------------- lustre/lfsck/lfsck_striped_dir.c | 2 +- 5 files changed, 276 insertions(+), 126 deletions(-) diff --git a/lustre/lfsck/lfsck_internal.h b/lustre/lfsck/lfsck_internal.h index f21a7a7..6153b0c 100644 --- a/lustre/lfsck/lfsck_internal.h +++ b/lustre/lfsck/lfsck_internal.h @@ -801,6 +801,13 @@ struct lfsck_assistant_data { #define LFSCK_TMPBUF_LEN 64 +struct lfsck_lock_handle { + struct lustre_handle llh_pdo_lh; + struct lustre_handle llh_reg_lh; + ldlm_mode_t llh_pdo_mode; + ldlm_mode_t llh_reg_mode; +}; + struct lfsck_thread_info { struct lu_name lti_name_const; struct lu_name lti_name; @@ -848,6 +855,7 @@ struct lfsck_thread_info { struct lmv_mds_md_v1 lti_lmv2; struct lmv_mds_md_v1 lti_lmv3; struct lmv_mds_md_v1 lti_lmv4; + struct lfsck_lock_handle lti_llh; }; /* lfsck_lib.c */ @@ -857,6 +865,10 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, struct dt_object *obj, struct lustre_handle *lh, __u64 bits, ldlm_mode_t mode); void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode); +int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck, + struct dt_object *obj, const char *name, + struct lfsck_lock_handle *llh, __u64 bits, ldlm_mode_t mode); +void lfsck_unlock(struct lfsck_lock_handle *llh); int lfsck_find_mdt_idx_by_fid(const struct lu_env *env, struct lfsck_instance *lfsck, const struct lu_fid *fid); diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index 00ef9cf..207bfcf 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -1766,7 +1766,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env, struct thandle *th = NULL; struct lu_buf *ea_buf = &info->lti_big_buf; struct lu_buf lov_buf; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; const struct lu_name *pname; @@ -1806,25 +1806,6 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env, LASSERT(infix != NULL); LASSERT(type != NULL); - do { - snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix, - type, idx++); - rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid, - (const struct dt_key *)name, BYPASS_CAPA); - if (rc != 0 && rc != -ENOENT) - GOTO(log, rc); - } while (rc == 0); - - rc = linkea_data_new(&ldata, - &lfsck_env_info(env)->lti_linkea_buf); - if (rc != 0) - GOTO(log, rc); - - pname = lfsck_name_get_const(env, name, strlen(name)); - rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj)); - if (rc != 0) - GOTO(log, rc); - memset(la, 0, sizeof(*la)); la->la_uid = rec->lor_uid; la->la_gid = rec->lor_gid; @@ -1844,17 +1825,43 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env, GOTO(log, rc = -ENOMEM); } - /* Hold update lock on the .lustre/lost+found/MDTxxxx/. - * - * XXX: Currently, we do not grab the PDO lock as normal create cases, - * because creating MDT-object for orphan OST-object is rare, we - * do not much care about the performance. It can be improved in - * the future when needed. */ - rc = lfsck_ibits_lock(env, lfsck, lpf, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); +again: + do { + snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix, + type, idx++); + rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (rc != 0 && rc != -ENOENT) + GOTO(log, rc); + } while (rc == 0); + + rc = lfsck_lock(env, lfsck, lfsck->li_lpf_obj, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); + /* Re-check whether the name conflict with othrs after taken + * the ldlm lock. */ + rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (unlikely(rc == 0)) { + lfsck_unlock(llh); + goto again; + } + + if (rc != -ENOENT) + GOTO(unlock, rc); + + rc = linkea_data_new(&ldata, + &lfsck_env_info(env)->lti_linkea_buf); + if (rc != 0) + GOTO(unlock, rc); + + pname = lfsck_name_get_const(env, name, strlen(name)); + rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj)); + if (rc != 0) + GOTO(unlock, rc); + /* The 1st transaction. */ th = dt_trans_create(env, dev); if (IS_ERR(th)) @@ -1920,7 +1927,7 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); log: if (cobj != NULL && !IS_ERR(cobj)) @@ -2136,7 +2143,6 @@ static int lfsck_layout_conflict_create(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - /* Hold layout lock on the parent to prevent others to access. */ rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh, MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR, LCK_EX); diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index 6a9e7e0..62b1d8d 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -361,29 +361,15 @@ int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck, RETURN(rc); } -/** - * Request the specified ibits lock for the given object. - * - * Before the LFSCK modifying on the namespace visible object, - * it needs to acquire related ibits ldlm lock. - * - * \param[in] env pointer to the thread context - * \param[in] lfsck pointer to the lfsck instance - * \param[in] obj pointer to the dt_object to be locked - * \param[out] lh pointer to the lock handle - * \param[in] ibits the bits for the ldlm lock to be acquired - * \param[in] mode the mode for the ldlm lock to be acquired - * - * \retval 0 for success - * \retval negative error number on failure - */ -int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, - struct dt_object *obj, struct lustre_handle *lh, - __u64 bits, ldlm_mode_t mode) +static int __lfsck_ibits_lock(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *obj, + struct ldlm_res_id *resid, + struct lustre_handle *lh, + __u64 bits, ldlm_mode_t mode) { struct lfsck_thread_info *info = lfsck_env_info(env); ldlm_policy_data_t *policy = &info->lti_policy; - struct ldlm_res_id *resid = &info->lti_resid; __u64 flags = LDLM_FL_ATOMIC_CB; int rc; @@ -391,7 +377,6 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = bits; - fid_build_reg_res_name(lfsck_dto2fid(obj), resid); if (dt_object_remote(obj)) { struct ldlm_enqueue_info *einfo = &info->lti_einfo; @@ -422,6 +407,34 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, } /** + * Request the specified ibits lock for the given object. + * + * Before the LFSCK modifying on the namespace visible object, + * it needs to acquire related ibits ldlm lock. + * + * \param[in] env pointer to the thread context + * \param[in] lfsck pointer to the lfsck instance + * \param[in] obj pointer to the dt_object to be locked + * \param[out] lh pointer to the lock handle + * \param[in] bits the bits for the ldlm lock to be acquired + * \param[in] mode the mode for the ldlm lock to be acquired + * + * \retval 0 for success + * \retval negative error number on failure + */ +int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, + struct dt_object *obj, struct lustre_handle *lh, + __u64 bits, ldlm_mode_t mode) +{ + struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid; + + LASSERT(!lustre_handle_is_used(lh)); + + fid_build_reg_res_name(lfsck_dto2fid(obj), resid); + return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode); +} + +/** * Release the the specified ibits lock. * * If the lock has been acquired before, release it @@ -438,6 +451,86 @@ void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode) } } +/** + * Request compound ibits locks for the given pairs. + * + * Before the LFSCK modifying on the namespace visible object, it needs to + * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for + * the lock purpose. But the simple lfsck_ibits_lock for directory-based + * modificationis (such as insert name entry to the directory) may be too + * coarse-grained and not efficient. + * + * The lfsck_lock() will request compound ibits locks on the specified + * pairs: the PDO (Parallel Directory Operations) ibits (UPDATE) + * lock on the directory object, and the regular ibits lock on the name hash. + * + * \param[in] env pointer to the thread context + * \param[in] lfsck pointer to the lfsck instance + * \param[in] obj pointer to the dt_object to be locked + * \param[in] name used for building the PDO lock resource + * \param[out] llh pointer to the lfsck_lock_handle + * \param[in] bits the bits for the ldlm lock to be acquired + * \param[in] mode the mode for the ldlm lock to be acquired + * + * \retval 0 for success + * \retval negative error number on failure + */ +int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck, + struct dt_object *obj, const char *name, + struct lfsck_lock_handle *llh, __u64 bits, ldlm_mode_t mode) +{ + struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid; + int rc; + + LASSERT(S_ISDIR(lfsck_object_type(obj))); + LASSERT(name != NULL); + LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh)); + LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh)); + + switch (mode) { + case LCK_EX: + llh->llh_pdo_mode = LCK_EX; + break; + case LCK_PW: + llh->llh_pdo_mode = LCK_CW; + break; + case LCK_PR: + llh->llh_pdo_mode = LCK_CR; + break; + default: + CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj " + DFID"\n", lfsck_lfsck2name(lfsck), mode, + PFID(lfsck_dto2fid(obj))); + LBUG(); + } + + fid_build_reg_res_name(lfsck_dto2fid(obj), resid); + rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh, + MDS_INODELOCK_UPDATE, llh->llh_pdo_mode); + if (rc != 0) + return rc; + + llh->llh_reg_mode = mode; + resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name)); + rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh, + bits, llh->llh_reg_mode); + if (rc != 0) + lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode); + + return rc; +} + +/** + * Release the the compound ibits locks. + * + * \param[in] llh pointer to the lfsck_lock_handle to be released + */ +void lfsck_unlock(struct lfsck_lock_handle *llh) +{ + lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode); + lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode); +} + int lfsck_find_mdt_idx_by_fid(const struct lu_env *env, struct lfsck_instance *lfsck, const struct lu_fid *fid) @@ -480,12 +573,12 @@ static int lfsck_lpf_remove_name_entry(const struct lu_env *env, struct dt_object *parent = lfsck->li_lpf_root_obj; struct dt_device *dev = lfsck_obj2dev(parent); struct thandle *th; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &lfsck_env_info(env)->lti_llh; int rc; ENTRY; - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) RETURN(rc); @@ -520,7 +613,7 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc); @@ -882,7 +975,7 @@ static int lfsck_create_lpf(const struct lu_env *env, struct dt_object_format *dof = &info->lti_dof; struct dt_object *parent = lfsck->li_lpf_root_obj; struct dt_object *child = NULL; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; char name[8]; int node = lfsck_dev_idx(lfsck); int rc = 0; @@ -892,8 +985,8 @@ static int lfsck_create_lpf(const struct lu_env *env, LASSERT(parent != NULL); LASSERT(lfsck->li_lpf_obj == NULL); - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) RETURN(rc); @@ -951,7 +1044,7 @@ static int lfsck_create_lpf(const struct lu_env *env, GOTO(unlock, rc); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); if (rc != 0 && child != NULL && !IS_ERR(child)) lfsck_object_put(env, child); diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 585ab52..7755e33 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -917,7 +917,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, struct dt_device *dev = lfsck_obj2dev(orphan); struct dt_object *parent; struct thandle *th = NULL; - struct lustre_handle plh = { 0 }; + struct lfsck_lock_handle *pllh = &info->lti_llh; struct lustre_handle clh = { 0 }; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; @@ -934,12 +934,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, parent = lfsck->li_lpf_obj; pfid = lfsck_dto2fid(parent); - /* Hold update lock on the parent to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, parent, &plh, - MDS_INODELOCK_UPDATE, LCK_EX); - if (rc != 0) - GOTO(log, rc); - +again: do { namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d", PFID(cfid), infix, type, idx++); @@ -953,6 +948,29 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, exist = true; } while (rc == 0 && !exist); + rc = lfsck_lock(env, lfsck, parent, info->lti_key, pllh, + MDS_INODELOCK_UPDATE, LCK_PW); + if (rc != 0) + GOTO(log, rc); + + /* Re-check whether the name conflict with othrs after taken + * the ldlm lock. */ + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, + (const struct dt_key *)info->lti_key, BYPASS_CAPA); + if (rc == 0) { + if (!lu_fid_eq(cfid, &tfid)) { + exist = false; + lfsck_unlock(pllh); + goto again; + } + + exist = true; + } else if (rc != -ENOENT) { + GOTO(log, rc); + } else { + exist = false; + } + cname->ln_name = info->lti_key; cname->ln_namelen = namelen; rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); @@ -964,8 +982,8 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env, GOTO(log, rc); rc = lfsck_ibits_lock(env, lfsck, orphan, &clh, - MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP, - LCK_EX); + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_XATTR, LCK_EX); if (rc != 0) GOTO(log, rc); @@ -1080,7 +1098,7 @@ stop: log: lfsck_ibits_unlock(&clh, LCK_EX); - lfsck_ibits_unlock(&plh, LCK_EX); + lfsck_unlock(pllh); CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the " "object "DFID", name = %s: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(cfid), @@ -1132,7 +1150,7 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, struct dt_object *pobj = NULL; struct dt_object *cobj = NULL; struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; int rc = 0; ENTRY; @@ -1153,9 +1171,8 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env, if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(log, rc = 1); - /* Hold update lock on the pobj to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, pobj, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -1217,7 +1234,7 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with " @@ -1272,7 +1289,7 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, struct dt_device *dev = lfsck_obj2dev(orphan); struct dt_object *parent = NULL; struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; struct lu_buf lmv_buf; @@ -1319,22 +1336,35 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env, if (IS_ERR(dev)) GOTO(log, rc = PTR_ERR(dev)); - /* Hold update lock on the parent to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); - if (rc != 0) - GOTO(log, rc); - idx = 0; + +again: do { namelen = snprintf(name, 31, DFID"-P-%d", PFID(cfid), idx++); rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, (const struct dt_key *)name, BYPASS_CAPA); if (rc != 0 && rc != -ENOENT) - GOTO(unlock1, rc); + GOTO(log, rc); } while (rc == 0); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); + if (rc != 0) + GOTO(log, rc); + + /* Re-check whether the name conflict with othrs after taken + * the ldlm lock. */ + rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (unlikely(rc == 0)) { + lfsck_unlock(llh); + goto again; + } + + if (rc != -ENOENT) + GOTO(unlock1, rc); + cname->ln_name = name; cname->ln_namelen = namelen; @@ -1477,7 +1507,7 @@ stop: dt_trans_stop(env, dev, th); unlock1: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for " @@ -1533,8 +1563,8 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env, ENTRY; rc = lfsck_ibits_lock(env, lfsck, obj, &lh, - MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR, LCK_EX); + MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, + LCK_EX); if (rc != 0) GOTO(log, rc); @@ -1649,20 +1679,21 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env, struct lu_name *cname, struct lu_fid *pfid) { - struct lu_fid *cfid = &lfsck_env_info(env)->lti_fid3; - struct lustre_handle lh = { 0 }; - int rc; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_fid *cfid = &info->lti_fid3; + struct lfsck_lock_handle *llh = &info->lti_llh; + int rc; ENTRY; - rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, com->lc_lfsck, parent, cname->ln_name, llh, + MDS_INODELOCK_UPDATE, LCK_PR); if (rc != 0) RETURN(rc); dt_read_lock(env, parent, 0); if (unlikely(lfsck_is_dead_obj(parent))) { dt_read_unlock(env, parent); - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname, pfid, true); @@ -1681,7 +1712,7 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env, * has removed the specified linkEA entry by race, then it is OK, * because the subsequent lfsck_namespace_shrink_linkea() can handle * such case. */ - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); if (rc == -ENOENT) { rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname, pfid, true); @@ -1745,7 +1776,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, const char *name = cname->ln_name; struct dt_object *pobj = NULL; struct dt_object *cobj = NULL; - struct lustre_handle plh = { 0 }; + struct lfsck_lock_handle *pllh = &info->lti_llh; struct lustre_handle clh = { 0 }; struct linkea_data ldata = { NULL }; struct thandle *th = NULL; @@ -1763,9 +1794,8 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, if (unlikely(!dt_try_as_dir(env, pobj))) GOTO(log, rc = -ENOTDIR); - /* Hold update lock on the pobj to prevent others to access. */ - rc = lfsck_ibits_lock(env, lfsck, pobj, &plh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, pllh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -1807,7 +1837,8 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, /* lock the object to be destroyed. */ rc = lfsck_ibits_lock(env, lfsck, cobj, &clh, MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR, LCK_EX); + MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, + LCK_EX); if (rc != 0) GOTO(log, rc); @@ -1900,7 +1931,7 @@ stop: log: lfsck_ibits_unlock(&clh, LCK_EX); - lfsck_ibits_unlock(&plh, LCK_EX); + lfsck_unlock(pllh); if (cobj != NULL && !IS_ERR(cobj)) lfsck_object_put(env, cobj); @@ -2016,21 +2047,27 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, const char *name, const char *name2, __u16 type, bool update, bool dec) { - struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec; - const struct lu_fid *cfid = lfsck_dto2fid(child); - struct lu_fid tfid; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck_obj2dev(parent); - struct thandle *th = NULL; - struct lustre_handle lh = { 0 }; - int rc = 0; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct dt_insert_rec *rec = &info->lti_dt_rec; + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct lu_fid tfid; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct lustre_handle lh = { 0 }; + int rc = 0; ENTRY; if (unlikely(!dt_try_as_dir(env, parent))) GOTO(log, rc = -ENOTDIR); - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + if (!update || strcmp(name, name2) == 0) + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); + else + rc = lfsck_ibits_lock(env, lfsck, parent, &lh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -2116,7 +2153,9 @@ stop: LNTF_CHECK_LINKEA, true); unlock1: - lfsck_ibits_unlock(&lh, LCK_EX); + /* It is harmless even if unlock the unused lock_handle */ + lfsck_ibits_unlock(&lh, LCK_PW); + lfsck_unlock(llh); log: CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name " @@ -2886,8 +2925,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, LASSERT(S_ISREG(lfsck_object_type(obj))); rc = lfsck_ibits_lock(env, lfsck, obj, &lh, - MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR, LCK_EX); + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -2953,7 +2991,7 @@ stop: dt_trans_stop(env, dev, th); log: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_ibits_unlock(&lh, LCK_PW); CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s " "nlink count from %u to %u: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc); @@ -4733,7 +4771,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, struct dt_object *cobj = NULL; const struct lu_name *cname; struct linkea_data ldata = { NULL }; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; struct lu_buf linkea_buf; struct lu_buf lmv_buf; struct lfsck_instance *lfsck = com->lc_lfsck; @@ -4776,8 +4814,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(log, rc); - rc = lfsck_ibits_lock(env, lfsck, pobj, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, lnr->lnr_name, llh, + MDS_INODELOCK_UPDATE, LCK_PR); if (rc != 0) GOTO(log, rc); @@ -4923,7 +4961,7 @@ stop: dt_trans_stop(env, dev, th); log: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling " "reference for: parent "DFID", child "DFID", type %u, " "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck), @@ -6317,16 +6355,17 @@ int lfsck_update_name_entry(const struct lu_env *env, struct dt_object *dir, const char *name, const struct lu_fid *fid, __u32 type) { - struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec; - struct dt_device *dev = lfsck_obj2dev(dir); - struct lustre_handle lh = { 0 }; - struct thandle *th; - int rc; - bool exists = true; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct dt_device *dev = lfsck_obj2dev(dir); + struct thandle *th; + int rc; + bool exists = true; ENTRY; - rc = lfsck_ibits_lock(env, lfsck, dir, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, dir, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) RETURN(rc); @@ -6377,7 +6416,7 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(dir)), name, PFID(fid), type, rc); diff --git a/lustre/lfsck/lfsck_striped_dir.c b/lustre/lfsck/lfsck_striped_dir.c index 663bc55..3509629 100644 --- a/lustre/lfsck/lfsck_striped_dir.c +++ b/lustre/lfsck/lfsck_striped_dir.c @@ -2078,12 +2078,12 @@ repair: rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh, MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX); - lfsck_ibits_unlock(&lh, LCK_EX); if (rc1 != 0) goto next; rc1 = lfsck_namespace_rebuild_linkea(env, com, obj, &ldata); + lfsck_ibits_unlock(&lh, LCK_EX); if (rc1 >= 0) { linkea_repaired = true; if (rc1 > 0) -- 1.8.3.1