Whamcloud - gitweb
LU-5682 lfsck: optimize ldlm lock used by LFSCK 66/12766/11
authorFan Yong <fan.yong@intel.com>
Tue, 25 Nov 2014 14:24:59 +0000 (22:24 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 25 Mar 2015 12:59:15 +0000 (12:59 +0000)
When LFSCK repairs some inconsistency, it needs to take related
ldlm lock(s) firstly to prevent concurrent modifications or purge
client side cache. Originally, to simply the implementation, the
LFSCK just simply acquires LCK_EX mode ibits lock(s) on related
object(s). But such coarse-grained lock policy may be not efficient
for some directory-based modification, such as insert name entry to
the directory.

This patch introduces lfsck PDO (Parallel Directory Operations) lock
for directory-based LFSCK modification, it only locks part of the
directory with the given <object, name> pairs, then allow others to
access or modify the different part(s) of the directory in parallel,
and also avoid to purge client-side cache unnecessarily.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I29bad81112c14e3aaecaa2b808e60ea74c10a702
Reviewed-on: http://review.whamcloud.com/12766
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/lfsck/lfsck_striped_dir.c

index f21a7a7..6153b0c 100644 (file)
@@ -801,6 +801,13 @@ struct lfsck_assistant_data {
 
 #define LFSCK_TMPBUF_LEN       64
 
+struct lfsck_lock_handle {
+       struct lustre_handle    llh_pdo_lh;
+       struct lustre_handle    llh_reg_lh;
+       ldlm_mode_t             llh_pdo_mode;
+       ldlm_mode_t             llh_reg_mode;
+};
+
 struct lfsck_thread_info {
        struct lu_name          lti_name_const;
        struct lu_name          lti_name;
@@ -848,6 +855,7 @@ struct lfsck_thread_info {
        struct lmv_mds_md_v1    lti_lmv2;
        struct lmv_mds_md_v1    lti_lmv3;
        struct lmv_mds_md_v1    lti_lmv4;
+       struct lfsck_lock_handle lti_llh;
 };
 
 /* lfsck_lib.c */
@@ -857,6 +865,10 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
                     struct dt_object *obj, struct lustre_handle *lh,
                     __u64 bits, ldlm_mode_t mode);
 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode);
+int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
+              struct dt_object *obj, const char *name,
+              struct lfsck_lock_handle *llh, __u64 bits, ldlm_mode_t mode);
+void lfsck_unlock(struct lfsck_lock_handle *llh);
 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
                              struct lfsck_instance *lfsck,
                              const struct lu_fid *fid);
index 00ef9cf..207bfcf 100644 (file)
@@ -1766,7 +1766,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        struct thandle                  *th     = NULL;
        struct lu_buf                   *ea_buf = &info->lti_big_buf;
        struct lu_buf                    lov_buf;
-       struct lustre_handle             lh     = { 0 };
+       struct lfsck_lock_handle        *llh    = &info->lti_llh;
        struct linkea_data               ldata  = { NULL };
        struct lu_buf                    linkea_buf;
        const struct lu_name            *pname;
@@ -1806,25 +1806,6 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        LASSERT(infix != NULL);
        LASSERT(type != NULL);
 
-       do {
-               snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
-                        type, idx++);
-               rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
-                              (const struct dt_key *)name, BYPASS_CAPA);
-               if (rc != 0 && rc != -ENOENT)
-                       GOTO(log, rc);
-       } while (rc == 0);
-
-       rc = linkea_data_new(&ldata,
-                            &lfsck_env_info(env)->lti_linkea_buf);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       pname = lfsck_name_get_const(env, name, strlen(name));
-       rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
-       if (rc != 0)
-               GOTO(log, rc);
-
        memset(la, 0, sizeof(*la));
        la->la_uid = rec->lor_uid;
        la->la_gid = rec->lor_gid;
@@ -1844,17 +1825,43 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                        GOTO(log, rc = -ENOMEM);
        }
 
-       /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
-        *
-        * XXX: Currently, we do not grab the PDO lock as normal create cases,
-        *      because creating MDT-object for orphan OST-object is rare, we
-        *      do not much care about the performance. It can be improved in
-        *      the future when needed. */
-       rc = lfsck_ibits_lock(env, lfsck, lpf, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+again:
+       do {
+               snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
+                        type, idx++);
+               rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
+                              (const struct dt_key *)name, BYPASS_CAPA);
+               if (rc != 0 && rc != -ENOENT)
+                       GOTO(log, rc);
+       } while (rc == 0);
+
+       rc = lfsck_lock(env, lfsck, lfsck->li_lpf_obj, name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                GOTO(log, rc);
 
+       /* Re-check whether the name conflict with othrs after taken
+        * the ldlm lock. */
+       rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
+                      (const struct dt_key *)name, BYPASS_CAPA);
+       if (unlikely(rc == 0)) {
+               lfsck_unlock(llh);
+               goto again;
+       }
+
+       if (rc != -ENOENT)
+               GOTO(unlock, rc);
+
+       rc = linkea_data_new(&ldata,
+                            &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       pname = lfsck_name_get_const(env, name, strlen(name));
+       rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
+       if (rc != 0)
+               GOTO(unlock, rc);
+
        /* The 1st transaction. */
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
@@ -1920,7 +1927,7 @@ stop:
                dt_trans_stop(env, dev, th);
 
 unlock:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
 
 log:
        if (cobj != NULL && !IS_ERR(cobj))
@@ -2136,7 +2143,6 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
-       /* Hold layout lock on the parent to prevent others to access. */
        rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
                              LCK_EX);
index 6a9e7e0..62b1d8d 100644 (file)
@@ -361,29 +361,15 @@ int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
        RETURN(rc);
 }
 
-/**
- * Request the specified ibits lock for the given object.
- *
- * Before the LFSCK modifying on the namespace visible object,
- * it needs to acquire related ibits ldlm lock.
- *
- * \param[in] env      pointer to the thread context
- * \param[in] lfsck    pointer to the lfsck instance
- * \param[in] obj      pointer to the dt_object to be locked
- * \param[out] lh      pointer to the lock handle
- * \param[in] ibits    the bits for the ldlm lock to be acquired
- * \param[in] mode     the mode for the ldlm lock to be acquired
- *
- * \retval             0 for success
- * \retval             negative error number on failure
- */
-int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
-                    struct dt_object *obj, struct lustre_handle *lh,
-                    __u64 bits, ldlm_mode_t mode)
+static int __lfsck_ibits_lock(const struct lu_env *env,
+                             struct lfsck_instance *lfsck,
+                             struct dt_object *obj,
+                             struct ldlm_res_id *resid,
+                             struct lustre_handle *lh,
+                             __u64 bits, ldlm_mode_t mode)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
        ldlm_policy_data_t              *policy = &info->lti_policy;
-       struct ldlm_res_id              *resid  = &info->lti_resid;
        __u64                            flags  = LDLM_FL_ATOMIC_CB;
        int                              rc;
 
@@ -391,7 +377,6 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = bits;
-       fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
        if (dt_object_remote(obj)) {
                struct ldlm_enqueue_info *einfo = &info->lti_einfo;
 
@@ -422,6 +407,34 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
 }
 
 /**
+ * Request the specified ibits lock for the given object.
+ *
+ * Before the LFSCK modifying on the namespace visible object,
+ * it needs to acquire related ibits ldlm lock.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ * \param[in] obj      pointer to the dt_object to be locked
+ * \param[out] lh      pointer to the lock handle
+ * \param[in] bits     the bits for the ldlm lock to be acquired
+ * \param[in] mode     the mode for the ldlm lock to be acquired
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
+                    struct dt_object *obj, struct lustre_handle *lh,
+                    __u64 bits, ldlm_mode_t mode)
+{
+       struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
+
+       LASSERT(!lustre_handle_is_used(lh));
+
+       fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
+       return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
+}
+
+/**
  * Release the the specified ibits lock.
  *
  * If the lock has been acquired before, release it
@@ -438,6 +451,86 @@ void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
        }
 }
 
+/**
+ * Request compound ibits locks for the given <obj, name> pairs.
+ *
+ * Before the LFSCK modifying on the namespace visible object, it needs to
+ * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
+ * the lock purpose. But the simple lfsck_ibits_lock for directory-based
+ * modificationis (such as insert name entry to the directory) may be too
+ * coarse-grained and not efficient.
+ *
+ * The lfsck_lock() will request compound ibits locks on the specified
+ * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
+ * lock on the directory object, and the regular ibits lock on the name hash.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ * \param[in] obj      pointer to the dt_object to be locked
+ * \param[in] name     used for building the PDO lock resource
+ * \param[out] llh     pointer to the lfsck_lock_handle
+ * \param[in] bits     the bits for the ldlm lock to be acquired
+ * \param[in] mode     the mode for the ldlm lock to be acquired
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
+                  struct dt_object *obj, const char *name,
+                  struct lfsck_lock_handle *llh, __u64 bits, ldlm_mode_t mode)
+{
+       struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
+       int                 rc;
+
+       LASSERT(S_ISDIR(lfsck_object_type(obj)));
+       LASSERT(name != NULL);
+       LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
+       LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
+
+       switch (mode) {
+       case LCK_EX:
+               llh->llh_pdo_mode = LCK_EX;
+               break;
+       case LCK_PW:
+               llh->llh_pdo_mode = LCK_CW;
+               break;
+       case LCK_PR:
+               llh->llh_pdo_mode = LCK_CR;
+               break;
+       default:
+               CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
+                      DFID"\n", lfsck_lfsck2name(lfsck), mode,
+                      PFID(lfsck_dto2fid(obj)));
+               LBUG();
+       }
+
+       fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
+       rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
+                               MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
+       if (rc != 0)
+               return rc;
+
+       llh->llh_reg_mode = mode;
+       resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name));
+       rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
+                               bits, llh->llh_reg_mode);
+       if (rc != 0)
+               lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
+
+       return rc;
+}
+
+/**
+ * Release the the compound ibits locks.
+ *
+ * \param[in] llh      pointer to the lfsck_lock_handle to be released
+ */
+void lfsck_unlock(struct lfsck_lock_handle *llh)
+{
+       lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
+       lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
+}
+
 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
                              struct lfsck_instance *lfsck,
                              const struct lu_fid *fid)
@@ -480,12 +573,12 @@ static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
        struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct dt_device        *dev    = lfsck_obj2dev(parent);
        struct thandle          *th;
-       struct lustre_handle     lh     = { 0 };
+       struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
        int                      rc;
        ENTRY;
 
-       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, lfsck, parent, name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                RETURN(rc);
 
@@ -520,7 +613,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 unlock:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
 
        CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
               lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
@@ -882,7 +975,7 @@ static int lfsck_create_lpf(const struct lu_env *env,
        struct dt_object_format  *dof   = &info->lti_dof;
        struct dt_object         *parent = lfsck->li_lpf_root_obj;
        struct dt_object         *child = NULL;
-       struct lustre_handle      lh    = { 0 };
+       struct lfsck_lock_handle *llh   = &info->lti_llh;
        char                      name[8];
        int                       node  = lfsck_dev_idx(lfsck);
        int                       rc    = 0;
@@ -892,8 +985,8 @@ static int lfsck_create_lpf(const struct lu_env *env,
        LASSERT(parent != NULL);
        LASSERT(lfsck->li_lpf_obj == NULL);
 
-       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, lfsck, parent, name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                RETURN(rc);
 
@@ -951,7 +1044,7 @@ static int lfsck_create_lpf(const struct lu_env *env,
        GOTO(unlock, rc);
 
 unlock:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
        if (rc != 0 && child != NULL && !IS_ERR(child))
                lfsck_object_put(env, child);
 
index 585ab52..7755e33 100644 (file)
@@ -917,7 +917,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
        struct dt_device                *dev    = lfsck_obj2dev(orphan);
        struct dt_object                *parent;
        struct thandle                  *th     = NULL;
-       struct lustre_handle             plh    = { 0 };
+       struct lfsck_lock_handle        *pllh   = &info->lti_llh;
        struct lustre_handle             clh    = { 0 };
        struct linkea_data               ldata  = { NULL };
        struct lu_buf                    linkea_buf;
@@ -934,12 +934,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
        parent = lfsck->li_lpf_obj;
        pfid = lfsck_dto2fid(parent);
 
-       /* Hold update lock on the parent to prevent others to access. */
-       rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
-       if (rc != 0)
-               GOTO(log, rc);
-
+again:
        do {
                namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
                                   PFID(cfid), infix, type, idx++);
@@ -953,6 +948,29 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
                        exist = true;
        } while (rc == 0 && !exist);
 
+       rc = lfsck_lock(env, lfsck, parent, info->lti_key, pllh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       /* Re-check whether the name conflict with othrs after taken
+        * the ldlm lock. */
+       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
+                      (const struct dt_key *)info->lti_key, BYPASS_CAPA);
+       if (rc == 0) {
+               if (!lu_fid_eq(cfid, &tfid)) {
+                       exist = false;
+                       lfsck_unlock(pllh);
+                       goto again;
+               }
+
+               exist = true;
+       } else if (rc != -ENOENT) {
+               GOTO(log, rc);
+       } else {
+               exist = false;
+       }
+
        cname->ln_name = info->lti_key;
        cname->ln_namelen = namelen;
        rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
@@ -964,8 +982,8 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
                GOTO(log, rc);
 
        rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
-                             MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
-                             LCK_EX);
+                             MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP |
+                             MDS_INODELOCK_XATTR, LCK_EX);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1080,7 +1098,7 @@ stop:
 
 log:
        lfsck_ibits_unlock(&clh, LCK_EX);
-       lfsck_ibits_unlock(&plh, LCK_EX);
+       lfsck_unlock(pllh);
        CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
               "object "DFID", name = %s: rc = %d\n",
               lfsck_lfsck2name(lfsck), PFID(cfid),
@@ -1132,7 +1150,7 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env,
        struct dt_object                *pobj   = NULL;
        struct dt_object                *cobj   = NULL;
        struct thandle                  *th     = NULL;
-       struct lustre_handle             lh     = { 0 };
+       struct lfsck_lock_handle        *llh    = &info->lti_llh;
        int                              rc     = 0;
        ENTRY;
 
@@ -1153,9 +1171,8 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env,
        if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
                GOTO(log, rc = 1);
 
-       /* Hold update lock on the pobj to prevent others to access. */
-       rc = lfsck_ibits_lock(env, lfsck, pobj, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, lfsck, parent, name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1217,7 +1234,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 unlock:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
 
 log:
        CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
@@ -1272,7 +1289,7 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
        struct dt_device                *dev    = lfsck_obj2dev(orphan);
        struct dt_object                *parent = NULL;
        struct thandle                  *th     = NULL;
-       struct lustre_handle             lh     = { 0 };
+       struct lfsck_lock_handle        *llh    = &info->lti_llh;
        struct linkea_data               ldata  = { NULL };
        struct lu_buf                    linkea_buf;
        struct lu_buf                    lmv_buf;
@@ -1319,22 +1336,35 @@ static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
        if (IS_ERR(dev))
                GOTO(log, rc = PTR_ERR(dev));
 
-       /* Hold update lock on the parent to prevent others to access. */
-       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
-       if (rc != 0)
-               GOTO(log, rc);
-
        idx = 0;
+
+again:
        do {
                namelen = snprintf(name, 31, DFID"-P-%d",
                                   PFID(cfid), idx++);
                rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                               (const struct dt_key *)name, BYPASS_CAPA);
                if (rc != 0 && rc != -ENOENT)
-                       GOTO(unlock1, rc);
+                       GOTO(log, rc);
        } while (rc == 0);
 
+       rc = lfsck_lock(env, lfsck, parent, name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       /* Re-check whether the name conflict with othrs after taken
+        * the ldlm lock. */
+       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
+                      (const struct dt_key *)name, BYPASS_CAPA);
+       if (unlikely(rc == 0)) {
+               lfsck_unlock(llh);
+               goto again;
+       }
+
+       if (rc != -ENOENT)
+               GOTO(unlock1, rc);
+
        cname->ln_name = name;
        cname->ln_namelen = namelen;
 
@@ -1477,7 +1507,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 unlock1:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
 
 log:
        CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for "
@@ -1533,8 +1563,8 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
        ENTRY;
 
        rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
-                             MDS_INODELOCK_UPDATE |
-                             MDS_INODELOCK_XATTR, LCK_EX);
+                             MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1649,20 +1679,21 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
                                              struct lu_name *cname,
                                              struct lu_fid *pfid)
 {
-       struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
-       struct lustre_handle     lh     = { 0 };
-       int                      rc;
+       struct lfsck_thread_info *info  = lfsck_env_info(env);
+       struct lu_fid            *cfid  = &info->lti_fid3;
+       struct lfsck_lock_handle *llh   = &info->lti_llh;
+       int                       rc;
        ENTRY;
 
-       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, com->lc_lfsck, parent, cname->ln_name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PR);
        if (rc != 0)
                RETURN(rc);
 
        dt_read_lock(env, parent, 0);
        if (unlikely(lfsck_is_dead_obj(parent))) {
                dt_read_unlock(env, parent);
-               lfsck_ibits_unlock(&lh, LCK_EX);
+               lfsck_unlock(llh);
                rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
                                                   cname, pfid, true);
 
@@ -1681,7 +1712,7 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
         * has removed the specified linkEA entry by race, then it is OK,
         * because the subsequent lfsck_namespace_shrink_linkea() can handle
         * such case. */
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
        if (rc == -ENOENT) {
                rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
                                                   cname, pfid, true);
@@ -1745,7 +1776,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
        const char                      *name   = cname->ln_name;
        struct dt_object                *pobj   = NULL;
        struct dt_object                *cobj   = NULL;
-       struct lustre_handle             plh    = { 0 };
+       struct lfsck_lock_handle        *pllh   = &info->lti_llh;
        struct lustre_handle             clh    = { 0 };
        struct linkea_data               ldata  = { NULL };
        struct thandle                  *th     = NULL;
@@ -1763,9 +1794,8 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
        if (unlikely(!dt_try_as_dir(env, pobj)))
                GOTO(log, rc = -ENOTDIR);
 
-       /* Hold update lock on the pobj to prevent others to access. */
-       rc = lfsck_ibits_lock(env, lfsck, pobj, &plh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, lfsck, parent, name, pllh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1807,7 +1837,8 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
        /* lock the object to be destroyed. */
        rc = lfsck_ibits_lock(env, lfsck, cobj, &clh,
                              MDS_INODELOCK_UPDATE |
-                             MDS_INODELOCK_XATTR, LCK_EX);
+                             MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1900,7 +1931,7 @@ stop:
 
 log:
        lfsck_ibits_unlock(&clh, LCK_EX);
-       lfsck_ibits_unlock(&plh, LCK_EX);
+       lfsck_unlock(pllh);
 
        if (cobj != NULL && !IS_ERR(cobj))
                lfsck_object_put(env, cobj);
@@ -2016,21 +2047,27 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
                                  const char *name, const char *name2,
                                  __u16 type, bool update, bool dec)
 {
-       struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
-       const struct lu_fid     *cfid   = lfsck_dto2fid(child);
-       struct lu_fid            tfid;
-       struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct dt_device        *dev    = lfsck_obj2dev(parent);
-       struct thandle          *th     = NULL;
-       struct lustre_handle     lh     = { 0 };
-       int                      rc     = 0;
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct dt_insert_rec            *rec    = &info->lti_dt_rec;
+       const struct lu_fid             *cfid   = lfsck_dto2fid(child);
+       struct lu_fid                    tfid;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_next;
+       struct thandle                  *th     = NULL;
+       struct lfsck_lock_handle        *llh    = &info->lti_llh;
+       struct lustre_handle             lh     = { 0 };
+       int                              rc     = 0;
        ENTRY;
 
        if (unlikely(!dt_try_as_dir(env, parent)))
                GOTO(log, rc = -ENOTDIR);
 
-       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (!update || strcmp(name, name2) == 0)
+               rc = lfsck_lock(env, lfsck, parent, name, llh,
+                               MDS_INODELOCK_UPDATE, LCK_PW);
+       else
+               rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
+                                     MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -2116,7 +2153,9 @@ stop:
                                             LNTF_CHECK_LINKEA, true);
 
 unlock1:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       /* It is harmless even if unlock the unused lock_handle */
+       lfsck_ibits_unlock(&lh, LCK_PW);
+       lfsck_unlock(llh);
 
 log:
        CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
@@ -2886,8 +2925,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
        LASSERT(S_ISREG(lfsck_object_type(obj)));
 
        rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
-                             MDS_INODELOCK_UPDATE |
-                             MDS_INODELOCK_XATTR, LCK_EX);
+                             MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -2953,7 +2991,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 log:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_ibits_unlock(&lh, LCK_PW);
        CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
               "nlink count from %u to %u: rc = %d\n",
               lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc);
@@ -4733,7 +4771,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
        struct dt_object                *cobj   = NULL;
        const struct lu_name            *cname;
        struct linkea_data               ldata  = { NULL };
-       struct lustre_handle             lh     = { 0 };
+       struct lfsck_lock_handle        *llh    = &info->lti_llh;
        struct lu_buf                    linkea_buf;
        struct lu_buf                    lmv_buf;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
@@ -4776,8 +4814,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
        if (rc != 0)
                GOTO(log, rc);
 
-       rc = lfsck_ibits_lock(env, lfsck, pobj, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, lfsck, parent, lnr->lnr_name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PR);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -4923,7 +4961,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 log:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
        CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
               "reference for: parent "DFID", child "DFID", type %u, "
               "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
@@ -6317,16 +6355,17 @@ int lfsck_update_name_entry(const struct lu_env *env,
                            struct dt_object *dir, const char *name,
                            const struct lu_fid *fid, __u32 type)
 {
-       struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
-       struct dt_device        *dev    = lfsck_obj2dev(dir);
-       struct lustre_handle     lh     = { 0 };
-       struct thandle          *th;
-       int                      rc;
-       bool                     exists = true;
+       struct lfsck_thread_info *info   = lfsck_env_info(env);
+       struct dt_insert_rec     *rec    = &info->lti_dt_rec;
+       struct lfsck_lock_handle *llh    = &info->lti_llh;
+       struct dt_device         *dev    = lfsck_obj2dev(dir);
+       struct thandle           *th;
+       int                       rc;
+       bool                      exists = true;
        ENTRY;
 
-       rc = lfsck_ibits_lock(env, lfsck, dir, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
+       rc = lfsck_lock(env, lfsck, dir, name, llh,
+                       MDS_INODELOCK_UPDATE, LCK_PW);
        if (rc != 0)
                RETURN(rc);
 
@@ -6377,7 +6416,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 unlock:
-       lfsck_ibits_unlock(&lh, LCK_EX);
+       lfsck_unlock(llh);
        CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
               " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
               PFID(lfsck_dto2fid(dir)), name, PFID(fid), type, rc);
index 663bc55..3509629 100644 (file)
@@ -2078,12 +2078,12 @@ repair:
                        rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
                                               MDS_INODELOCK_UPDATE |
                                               MDS_INODELOCK_XATTR, LCK_EX);
-                       lfsck_ibits_unlock(&lh, LCK_EX);
                        if (rc1 != 0)
                                goto next;
 
                        rc1 = lfsck_namespace_rebuild_linkea(env, com, obj,
                                                             &ldata);
+                       lfsck_ibits_unlock(&lh, LCK_EX);
                        if (rc1 >= 0) {
                                linkea_repaired = true;
                                if (rc1 > 0)