From a6dafda245a4ee4c5a86f585914cf0dde87e420e Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Thu, 13 Dec 2018 00:04:12 +0800 Subject: [PATCH] LU-11681 lfsck: misc fixes for dangling entry repair * lock child in dangling entry repair. * if child is directory, create it as plain directory, because it may have shards already, which will be inserted later, besides, it may be remote, and creating striped directory remotely is not supported. Signed-off-by: Lai Siyao Change-Id: Ide0d90f99e6d24f4a9507d98bbc7ad7f876a907d Reviewed-on: https://review.whamcloud.com/33927 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Fan Yong Reviewed-by: Oleg Drokin --- lustre/lfsck/lfsck_internal.h | 4 ++ lustre/lfsck/lfsck_lib.c | 30 +++++++++++ lustre/lfsck/lfsck_namespace.c | 120 ++++++++++++++++++++++++++++++----------- 3 files changed, 124 insertions(+), 30 deletions(-) diff --git a/lustre/lfsck/lfsck_internal.h b/lustre/lfsck/lfsck_internal.h index 2756d9a2..428e28f 100644 --- a/lustre/lfsck/lfsck_internal.h +++ b/lustre/lfsck/lfsck_internal.h @@ -926,6 +926,10 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, struct dt_object *obj, struct lustre_handle *lh, __u64 bits, enum ldlm_mode mode); void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode); +int lfsck_remote_lookup_lock(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *pobj, struct dt_object *obj, + struct lustre_handle *lh, enum ldlm_mode mode); int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck, struct dt_object *obj, const char *name, struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode); diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index cec8577..7e67555 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -432,6 +432,36 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, } /** + * Request the remote LOOKUP lock for the given object. + * + * If \a pobj is remote, the LOOKUP lock of \a obj is on the MDT where + * \a pobj is, acquire LOOKUP lock there. + * + * \param[in] env pointer to the thread context + * \param[in] lfsck pointer to the lfsck instance + * \param[in] pobj pointer to parent dt_object + * \param[in] obj pointer to the dt_object to be locked + * \param[out] lh pointer to the lock handle + * \param[in] mode the mode for the ldlm lock to be acquired + * + * \retval 0 for success + * \retval negative error number on failure + */ +int lfsck_remote_lookup_lock(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *pobj, struct dt_object *obj, + struct lustre_handle *lh, enum ldlm_mode mode) +{ + struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid; + + LASSERT(!lustre_handle_is_used(lh)); + + fid_build_reg_res_name(lfsck_dto2fid(obj), resid); + return __lfsck_ibits_lock(env, lfsck, pobj, resid, lh, + MDS_INODELOCK_LOOKUP, mode); +} + +/** * Release the the specified ibits lock. * * If the lock has been acquired before, release it diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 6cd0819..553057d 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -5109,26 +5109,28 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, struct dt_object *child, struct lfsck_namespace_req *lnr) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_attr *la = &info->lti_la; - struct dt_allocation_hint *hint = &info->lti_hint; - struct dt_object_format *dof = &info->lti_dof; - struct dt_insert_rec *rec = &info->lti_dt_rec; - struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; - const struct lu_name *cname; - const struct lu_fid *pfid = lfsck_dto2fid(parent); - const struct lu_fid *cfid = lfsck_dto2fid(child); - struct linkea_data ldata = { NULL }; - struct lfsck_lock_handle *llh = &info->lti_llh; - struct lu_buf linkea_buf; - struct lu_buf lmv_buf; - struct lfsck_instance *lfsck = com->lc_lfsck; - struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; - struct dt_device *dev = lfsck->li_next; - struct thandle *th = NULL; - int rc = 0; - __u16 type = lnr->lnr_type; - bool create; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la; + struct dt_allocation_hint *hint = &info->lti_hint; + struct dt_object_format *dof = &info->lti_dof; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2; + const struct lu_name *cname; + const struct lu_fid *pfid = lfsck_dto2fid(parent); + const struct lu_fid *cfid = lfsck_dto2fid(child); + struct linkea_data ldata = { NULL }; + struct lfsck_lock_handle *llh = &info->lti_llh; + struct lustre_handle rlh = { 0 }; + struct lustre_handle clh = { 0 }; + struct lu_buf linkea_buf; + struct lu_buf lmv_buf; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct dt_device *dev = lfsck->li_next; + struct thandle *th = NULL; + int rc = 0; + __u16 type = lnr->lnr_type; + bool create; ENTRY; cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen); @@ -5160,7 +5162,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, GOTO(log, rc); rc = lfsck_lock(env, lfsck, parent, lnr->lnr_name, llh, - MDS_INODELOCK_UPDATE, LCK_PR); + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) GOTO(log, rc); @@ -5168,17 +5170,38 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(log, rc); + if (dt_object_remote(child)) { + rc = lfsck_remote_lookup_lock(env, lfsck, parent, child, &rlh, + LCK_EX); + if (rc != 0) + GOTO(log, rc); + } + + rc = lfsck_ibits_lock(env, lfsck, child, &clh, + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc != 0) + GOTO(unlock_remote_lookup, rc); + /* Set the ctime as zero, then others can know it is created for * repairing dangling name entry by LFSCK. And if the LFSCK made * wrong decision and the real MDT-object has been found later, * then the LFSCK has chance to fix the incosistency properly. */ memset(la, 0, sizeof(*la)); - la->la_mode = (type & S_IFMT) | 0600; - la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID | - LA_ATIME | LA_MTIME | LA_CTIME; + if (S_ISDIR(type)) + la->la_mode = (type & S_IFMT) | 0700; + else + la->la_mode = (type & S_IFMT) | 0600; + la->la_valid = LA_TYPE | LA_MODE | LA_CTIME; - child->do_ops->do_ah_init(env, hint, parent, child, - la->la_mode & S_IFMT); + /* + * if it's directory, skip do_ah_init() to create a plain directory + * because it may have shards already, which will be inserted back + * later, besides, it may be remote, and creating stripe directory + * remotely is not supported. + */ + if (S_ISREG(type)) + child->do_ops->do_ah_init(env, hint, parent, child, type); memset(dof, 0, sizeof(*dof)); dof->dof_type = dt_mode_to_dft(type); @@ -5188,7 +5211,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, th = dt_trans_create(env, dev); if (IS_ERR(th)) - GOTO(log, rc = PTR_ERR(th)); + GOTO(unlock_child, rc = PTR_ERR(th)); /* 1a. create child. */ rc = dt_declare_create(env, child, la, hint, dof, th); @@ -5238,7 +5261,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, lfsck_lmv_header_cpu_to_le(lmv2, lmv2); lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2)); rc = dt_declare_xattr_set(env, child, &lmv_buf, - XATTR_NAME_LMV, 0, th); + XATTR_NAME_LMV".set", 0, th); if (rc != 0) GOTO(stop, rc); } @@ -5252,6 +5275,21 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); + /* 7a. if child is remote, delete and insert to generate local agent */ + if (dt_object_remote(child)) { + rc = dt_declare_delete(env, parent, + (const struct dt_key *)lnr->lnr_name, + th); + if (rc) + GOTO(stop, rc); + + rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)lnr->lnr_name, + th); + if (rc) + GOTO(stop, rc); + } + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc = (rc == -EEXIST ? 1 : rc)); @@ -5285,8 +5323,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, /* 5b. generate slave LMV EA. */ if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) { - rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, - 0, th); + rc = dt_xattr_set(env, child, &lmv_buf, + XATTR_NAME_LMV".set", 0, th); if (rc != 0) GOTO(unlock, rc); } @@ -5295,6 +5333,23 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, /* 6b. insert linkEA for child. */ rc = dt_xattr_set(env, child, &linkea_buf, XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(unlock, rc); + + /* 7b. if child is remote, delete and insert to generate local agent */ + if (dt_object_remote(child)) { + rc = dt_delete(env, parent, + (const struct dt_key *)lnr->lnr_name, th); + if (rc) + GOTO(unlock, rc); + + rec->rec_type = type; + rec->rec_fid = cfid; + rc = dt_insert(env, parent, (const struct dt_rec *)rec, + (const struct dt_key *)lnr->lnr_name, th); + if (rc) + GOTO(unlock, rc); + } GOTO(unlock, rc); @@ -5304,6 +5359,11 @@ unlock: stop: dt_trans_stop(env, dev, th); +unlock_child: + lfsck_ibits_unlock(&clh, LCK_EX); +unlock_remote_lookup: + if (dt_object_remote(child)) + lfsck_ibits_unlock(&rlh, LCK_EX); log: lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling " -- 1.8.3.1