Whamcloud - gitweb
LU-2430 mdt: Add global rename lock.
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 9de12fa..c1b0020 100644 (file)
@@ -282,21 +282,13 @@ static int mdt_md_create(struct mdt_thread_info *info)
                GOTO(put_parent, rc = -ENOENT);
 
        lh = &info->mti_lh[MDT_LH_PARENT];
-       if (mdt_object_remote(parent)) {
-               mdt_lock_reg_init(lh, LCK_EX);
-               rc = mdt_remote_object_lock(info, parent, &lh->mlh_rreg_lh,
-                                           lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE);
-               if (rc != ELDLM_OK)
-                       GOTO(put_parent, rc);
-
-       } else {
-               mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-               rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE,
-                                    MDT_LOCAL_LOCK);
-               if (rc)
-                       GOTO(put_parent, rc);
+       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
+       rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE,
+                            MDT_CROSS_LOCK);
+       if (rc)
+               GOTO(put_parent, rc);
 
+       if (!mdt_object_remote(parent)) {
                rc = mdt_version_get_check_save(info, parent, 0);
                if (rc)
                        GOTO(unlock_parent, rc);
@@ -803,22 +795,13 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        }
 
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
+       rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
+                            MDT_CROSS_LOCK);
+       if (rc != 0)
+               GOTO(put_parent, rc);
 
-       if (mdt_object_remote(mp)) {
-               mdt_lock_reg_init(parent_lh, LCK_EX);
-               rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
-                                           parent_lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE);
-               if (rc != ELDLM_OK)
-                       GOTO(put_parent, rc);
-
-       } else {
-               mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
-               rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
-                                    MDT_LOCAL_LOCK);
-               if (rc)
-                       GOTO(put_parent, rc);
-
+       if (!mdt_object_remote(mp)) {
                rc = mdt_version_get_check_save(info, mp, 0);
                if (rc)
                        GOTO(unlock_parent, rc);
@@ -1066,7 +1049,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
        }
 
        rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE |
-                            MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+                            MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
         if (rc != 0) {
                 mdt_object_put(info->mti_env, ms);
                 GOTO(out_unlock_parent, rc);
@@ -1143,46 +1126,57 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
 }
 
 static int mdt_rename_lock(struct mdt_thread_info *info,
-                           struct lustre_handle *lh)
+                          struct lustre_handle *lh, bool rename)
 {
        struct ldlm_namespace   *ns = info->mti_mdt->mdt_namespace;
+       __u64                   flags = 0;
+       int                     rc;
        ldlm_policy_data_t      *policy = &info->mti_policy;
        struct ldlm_res_id      *res_id = &info->mti_res_id;
-       __u64                   flags = 0;
-       int rc;
        ENTRY;
 
-       fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
+       /* XXX only do global rename lock for migration */
+       if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0 && !rename) {
+               struct lu_fid *fid = &info->mti_tmp_fid1;
+               struct mdt_object *obj;
+
+               /* XXX, right now, it has to use object API to
+                * enqueue lock cross MDT, so it will enqueue
+                * rename lock(with LUSTRE_BFL_FID) by root object */
+               lu_root_fid(fid);
+               obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+               if (IS_ERR(obj))
+                       RETURN(PTR_ERR(obj));
+
+               LASSERT(mdt_object_remote(obj));
+               rc = mdt_remote_object_lock(info, obj,
+                                           &LUSTRE_BFL_FID, lh,
+                                           LCK_EX,
+                                           MDS_INODELOCK_UPDATE);
+               mdt_object_put(info->mti_env, obj);
+       } else {
+               fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
+               memset(policy, 0, sizeof *policy);
+               policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+               flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
+               rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
+                                           LCK_EX, &flags, ldlm_blocking_ast,
+                                           ldlm_completion_ast, NULL, NULL, 0,
+                                           LVB_T_NONE,
+                                           &info->mti_exp->exp_handle.h_cookie,
+                                           lh);
+       }
 
-       memset(policy, 0, sizeof *policy);
-       policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 6, 53, 0)
-       /* In phase I, we will not do cross-rename, so local BFL lock would
-        * be enough
-        */
-       flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-       /*
-        * Current node is controller, that is mdt0, where we should
-        * take BFL lock.
-        */
-       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
-                                   LCK_EX, &flags, ldlm_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE,
-                                   &info->mti_exp->exp_handle.h_cookie,
-                                   lh);
-#else
-#warning "Local rename lock is invalid for DNE phase II."
-#endif
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static void mdt_rename_unlock(struct lustre_handle *lh)
 {
-        ENTRY;
-        LASSERT(lustre_handle_is_used(lh));
-        ldlm_lock_decref(lh, LCK_EX);
-        EXIT;
+       ENTRY;
+       LASSERT(lustre_handle_is_used(lh));
+       /* Cancel the single rename lock right away */
+       ldlm_lock_decref_and_cancel(lh, LCK_EX);
+       EXIT;
 }
 
 /*
@@ -1312,18 +1306,10 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
                        GOTO(out, rc = -ENOMEM);
                }
 
-               if (mdt_object_remote(mdt_pobj)) {
-                       mdt_lock_reg_init(&mll->mll_lh, LCK_EX);
-                       rc = mdt_remote_object_lock(info, mdt_pobj,
-                                                   &mll->mll_lh.mlh_rreg_lh,
-                                                   mll->mll_lh.mlh_rreg_mode,
-                                                   MDS_INODELOCK_UPDATE);
-               } else {
-                       mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
-                       rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
-                                            MDS_INODELOCK_UPDATE,
-                                            MDT_LOCAL_LOCK);
-               }
+               mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+               rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+                                    MDS_INODELOCK_UPDATE,
+                                    MDT_CROSS_LOCK);
                if (rc != 0) {
                        CERROR("%s: cannot lock "DFID": rc =%d\n",
                               mdt_obd_name(mdt), PFID(&fid), rc);
@@ -1371,22 +1357,14 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
        }
 
        lh_dirp = &info->mti_lh[MDT_LH_PARENT];
-       if (mdt_object_remote(msrcdir)) {
-               mdt_lock_reg_init(lh_dirp, LCK_EX);
-               rc = mdt_remote_object_lock(info, msrcdir,
-                                           &lh_dirp->mlh_rreg_lh,
-                                           lh_dirp->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE);
-               if (rc != ELDLM_OK)
-                       GOTO(out_put_parent, rc);
-       } else {
-               mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
-               rc = mdt_object_lock(info, msrcdir, lh_dirp,
-                                    MDS_INODELOCK_UPDATE,
-                                    MDT_LOCAL_LOCK);
-               if (rc)
-                       GOTO(out_put_parent, rc);
+       mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
+       rc = mdt_object_lock(info, msrcdir, lh_dirp,
+                            MDS_INODELOCK_UPDATE,
+                            MDT_CROSS_LOCK);
+       if (rc)
+               GOTO(out_put_parent, rc);
 
+       if (!mdt_object_remote(msrcdir)) {
                rc = mdt_version_get_check_save(info, msrcdir, 0);
                if (rc)
                        GOTO(out_unlock_parent, rc);
@@ -1476,6 +1454,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
                lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
                mdt_lock_reg_init(lh_tgtp, LCK_EX);
                rc = mdt_remote_object_lock(info, mnew,
+                                           mdt_object_fid(mnew),
                                            &lh_tgtp->mlh_rreg_lh,
                                            lh_tgtp->mlh_rreg_mode,
                                            MDS_INODELOCK_UPDATE);
@@ -1693,7 +1672,7 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                rc = mdt_object_lock(info, mnew, lh_newp,
                                     MDS_INODELOCK_LOOKUP |
                                     MDS_INODELOCK_UPDATE,
-                                    MDT_CROSS_LOCK);
+                                    MDT_LOCAL_LOCK);
                if (rc != 0)
                        GOTO(out_unlock_old, rc);
 
@@ -1790,7 +1769,7 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
            fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
                RETURN(-EPERM);
 
-       rc = mdt_rename_lock(info, &rename_lh);
+       rc = mdt_rename_lock(info, &rename_lh, rename);
        if (rc != 0) {
                CERROR("%s: can't lock FS for rename: rc  = %d\n",
                       mdt_obd_name(info->mti_mdt), rc);