Whamcloud - gitweb
LU-4788 lfsck: take ldlm lock before modifying visible object 86/10986/11
authorFan Yong <fan.yong@intel.com>
Sun, 13 Jul 2014 12:14:10 +0000 (20:14 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 5 Sep 2014 00:24:17 +0000 (00:24 +0000)
Before the LFSCK modifying on the namespace visible object,
it needs to acquire related ibits lock to pervent the client
to cache stale information. The .lustre/lost+found/ and its
sub-directories also needs related ldlm locks.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I439e02e1b7b24e87e7e6e25c5084f1c98116e7f7
Reviewed-on: http://review.whamcloud.com/10986
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c

index 93d960b..f9bd20e 100644 (file)
@@ -581,6 +581,10 @@ struct lfsck_thread_info {
 /* lfsck_lib.c */
 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
                    struct lu_fid *fid, bool locked);
+int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
+                    struct dt_object *obj, struct lustre_handle *lh,
+                    __u64 bits, ldlm_mode_t mode);
+void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode);
 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck);
 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
                                           bool unlink);
index 3db63dc..2ea8a49 100644 (file)
@@ -1625,45 +1625,6 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env,
        return rc;
 }
 
-static int lfsck_layout_lock(const struct lu_env *env,
-                            struct lfsck_component *com,
-                            struct dt_object *obj,
-                            struct lustre_handle *lh, __u64 bits)
-{
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       ldlm_policy_data_t              *policy = &info->lti_policy;
-       struct ldlm_res_id              *resid  = &info->lti_resid;
-       struct lfsck_instance           *lfsck  = com->lc_lfsck;
-       __u64                            flags  = LDLM_FL_ATOMIC_CB;
-       int                              rc;
-
-       LASSERT(lfsck->li_namespace != NULL);
-
-       memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = bits;
-       fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
-       rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
-                                   policy, LCK_EX, &flags, ldlm_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE, NULL, lh);
-       if (rc == ELDLM_OK) {
-               rc = 0;
-       } else {
-               memset(lh, 0, sizeof(*lh));
-               rc = -EIO;
-       }
-
-       return rc;
-}
-
-static void lfsck_layout_unlock(struct lustre_handle *lh)
-{
-       if (lustre_handle_is_used(lh)) {
-               ldlm_lock_decref(lh, LCK_EX);
-               memset(lh, 0, sizeof(*lh));
-       }
-}
-
 static int lfsck_layout_trans_stop(const struct lu_env *env,
                                   struct dt_device *dev,
                                   struct thandle *handle, int result)
@@ -2068,8 +2029,8 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
         *      because creating MDT-object for orphan OST-object is rare, we
         *      do not much care about the performance. It can be improved in
         *      the future when needed. */
-       rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
-                              MDS_INODELOCK_UPDATE);
+       rc = lfsck_ibits_lock(env, lfsck, lfsck->li_lpf_obj, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
        if (rc != 0)
                GOTO(put, rc);
 
@@ -2157,7 +2118,7 @@ stop:
        dt_trans_stop(env, next, th);
 
 unlock:
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
 put:
        if (cobj != NULL && !IS_ERR(cobj))
@@ -2376,8 +2337,9 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
                GOTO(out, rc);
 
        /* Hold layout lock on the parent to prevent others to access. */
-       rc = lfsck_layout_lock(env, com, parent, &lh,
-                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+                             MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0)
                GOTO(out, rc);
 
@@ -2389,7 +2351,7 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
         * a new MDT-object for the orphan OST-object. */
        if (rc == -ETXTBSY) {
                /* No need the layout lock on the original parent. */
-               lfsck_layout_unlock(&lh);
+               lfsck_ibits_unlock(&lh, LCK_EX);
 
                fid_zero(&rec->lor_fid);
                snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
@@ -2428,7 +2390,7 @@ stop:
        dt_trans_stop(env, dev, th);
 
 unlock:
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
 out:
        CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
@@ -2475,8 +2437,9 @@ static int lfsck_layout_recreate_lovea(const struct lu_env *env,
        bool                      locked        = false;
        ENTRY;
 
-       rc = lfsck_layout_lock(env, com, parent, &lh,
-                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
+                             MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0) {
                CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
                       "LOV EA for "DFID": parent "DFID", OST-index %u, "
@@ -2669,7 +2632,7 @@ again:
                                dt_write_unlock(env, parent);
                                if (handle != NULL)
                                        dt_trans_stop(env, dt, handle);
-                               lfsck_layout_unlock(&lh);
+                               lfsck_ibits_unlock(&lh, LCK_EX);
                                rc = lfsck_layout_update_pfid(env, com, parent,
                                                        cfid, ltd->ltd_tgt, i);
 
@@ -2694,7 +2657,7 @@ again:
        dt_write_unlock(env, parent);
        if (handle != NULL)
                dt_trans_stop(env, dt, handle);
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
        if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
                objs = &lmm->lmm_objects[ea_off];
        else
@@ -2713,7 +2676,7 @@ stop:
                dt_trans_stop(env, dt, handle);
 
 unlock_layout:
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
        return rc;
 }
@@ -2937,8 +2900,9 @@ static int lfsck_layout_repair_dangling(const struct lu_env *env,
        cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
                        LA_ATIME | LA_MTIME | LA_CTIME;
 
-       rc = lfsck_layout_lock(env, com, parent, &lh,
-                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+                             MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -2989,7 +2953,7 @@ stop:
        rc = lfsck_layout_trans_stop(env, dev, handle, rc);
 
 unlock1:
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
 log:
        CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
@@ -3025,8 +2989,9 @@ static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
        int                              rc;
        ENTRY;
 
-       rc = lfsck_layout_lock(env, com, parent, &lh,
-                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+                             MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -3083,7 +3048,7 @@ stop:
        rc = lfsck_layout_trans_stop(env, dev, handle, rc);
 
 unlock1:
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
 log:
        CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
@@ -3124,8 +3089,9 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
        int                              rc;
        ENTRY;
 
-       rc = lfsck_layout_lock(env, com, parent, &lh,
-                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+                             MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
+                             LCK_EX);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -3212,7 +3178,7 @@ stop:
        dt_trans_stop(env, pdev, handle);
 
 unlock1:
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
 log:
        CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
@@ -4872,9 +4838,9 @@ again:
        if (!lustre_handle_is_used(&lh)) {
                dt_read_unlock(env, obj);
                locked = false;
-               rc = lfsck_layout_lock(env, com, obj, &lh,
-                                      MDS_INODELOCK_LAYOUT |
-                                      MDS_INODELOCK_XATTR);
+               rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+                                     MDS_INODELOCK_LAYOUT |
+                                     MDS_INODELOCK_XATTR, LCK_EX);
                if (rc != 0)
                        GOTO(out, rc);
 
@@ -4920,7 +4886,7 @@ out:
        if (handle != NULL && !IS_ERR(handle))
                dt_trans_stop(env, dev, handle);
 
-       lfsck_layout_unlock(&lh);
+       lfsck_ibits_unlock(&lh, LCK_EX);
 
        if (bad_oi)
                CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
index 145a84c..325f7e7 100644 (file)
@@ -350,6 +350,68 @@ int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
        RETURN(rc);
 }
 
+/**
+ * Request the specified ibits lock for the given object.
+ *
+ * Before the LFSCK modifying on the namespace visible object,
+ * it needs to acquire related ibits ldlm lock.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ * \param[in] obj      pointer to the dt_object to be locked
+ * \param[out] lh      pointer to the lock handle
+ * \param[in] ibits    the bits for the ldlm lock to be acquired
+ * \param[in] mode     the mode for the ldlm lock to be acquired
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
+                    struct dt_object *obj, struct lustre_handle *lh,
+                    __u64 bits, ldlm_mode_t mode)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       ldlm_policy_data_t              *policy = &info->lti_policy;
+       struct ldlm_res_id              *resid  = &info->lti_resid;
+       __u64                            flags  = LDLM_FL_ATOMIC_CB;
+       int                              rc;
+
+       LASSERT(lfsck->li_namespace != NULL);
+
+       memset(policy, 0, sizeof(*policy));
+       policy->l_inodebits.bits = bits;
+       fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
+       rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
+                                   policy, mode, &flags, ldlm_blocking_ast,
+                                   ldlm_completion_ast, NULL, NULL, 0,
+                                   LVB_T_NONE, NULL, lh);
+       if (rc == ELDLM_OK) {
+               rc = 0;
+       } else {
+               memset(lh, 0, sizeof(*lh));
+               rc = -EIO;
+       }
+
+       return rc;
+}
+
+/**
+ * Release the the specified ibits lock.
+ *
+ * If the lock has been acquired before, release it
+ * and cleanup the handle. Otherwise, do nothing.
+ *
+ * \param[in] lh       pointer to the lock handle
+ * \param[in] mode     the mode for the ldlm lock to be released
+ */
+void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
+{
+       if (lustre_handle_is_used(lh)) {
+               ldlm_lock_decref(lh, mode);
+               memset(lh, 0, sizeof(*lh));
+       }
+}
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char dotlustre[] = ".lustre";
@@ -696,6 +758,7 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        struct dt_object_format  *dof   = &info->lti_dof;
        struct dt_object         *parent = NULL;
        struct dt_object         *child = NULL;
+       struct lustre_handle      lh    = { 0 };
        char                      name[8];
        int                       node  = lfsck_dev_idx(lfsck->li_bottom);
        int                       rc    = 0;
@@ -721,9 +784,17 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        if (IS_ERR(parent))
                RETURN(PTR_ERR(parent));
 
+       if (lfsck->li_lpf_obj != NULL)
+               GOTO(out, rc = 0);
+
        if (unlikely(!dt_try_as_dir(env, parent)))
                GOTO(out, rc = -ENOTDIR);
 
+       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (rc != 0)
+               GOTO(out, rc);
+
        mutex_lock(&lfsck->li_mutex);
        if (lfsck->li_lpf_obj != NULL)
                GOTO(unlock, rc = 0);
@@ -784,6 +855,7 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
 unlock:
        mutex_unlock(&lfsck->li_mutex);
+       lfsck_ibits_unlock(&lh, LCK_EX);
        if (rc != 0 && child != NULL && !IS_ERR(child))
                lu_object_put(env, &child->do_lu);
 out: