Whamcloud - gitweb
b=16721 fix rename pdif lock, when source and target in the same directory.
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index dc99028..60dbfc2 100644 (file)
@@ -108,7 +108,8 @@ int mdt_version_get_check(struct mdt_thread_info *info, int index)
                 RETURN(0);
 
         LASSERT(info->mti_mos[index]);
-        LASSERT(mdt_object_exists(info->mti_mos[index]));
+        if (mdt_object_exists(info->mti_mos[index]) == 0)
+                RETURN(-ESTALE);
         mo = mdt_object_child(info->mti_mos[index]);
 
         curr_version = mo_version_get(info->mti_env, mo);
@@ -120,16 +121,16 @@ int mdt_version_get_check(struct mdt_thread_info *info, int index)
                 /** Sanity check for malformed buffers */
                 if (pre_versions == NULL) {
                         CERROR("No versions in request buffer\n");
-                        spin_lock(&req->rq_export->exp_lock);
+                        cfs_spin_lock(&req->rq_export->exp_lock);
                         req->rq_export->exp_vbr_failed = 1;
-                        spin_unlock(&req->rq_export->exp_lock);
+                        cfs_spin_unlock(&req->rq_export->exp_lock);
                         RETURN(-EOVERFLOW);
                 } else if (pre_versions[index] != curr_version) {
                         CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
                                pre_versions[index], curr_version);
-                        spin_lock(&req->rq_export->exp_lock);
+                        cfs_spin_lock(&req->rq_export->exp_lock);
                         req->rq_export->exp_vbr_failed = 1;
-                        spin_unlock(&req->rq_export->exp_lock);
+                        cfs_spin_unlock(&req->rq_export->exp_lock);
                         RETURN(-EOVERFLOW);
                 }
         }
@@ -382,9 +383,9 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 mfd->mfd_object = mo;
                 mfd->mfd_xid = req->rq_xid;
 
-                spin_lock(&med->med_open_lock);
-                list_add(&mfd->mfd_list, &med->med_open_head);
-                spin_unlock(&med->med_open_lock);
+                cfs_spin_lock(&med->med_open_lock);
+                cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+                cfs_spin_unlock(&med->med_open_lock);
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
         }
 
@@ -395,10 +396,10 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
                 LASSERT(info->mti_ioepoch);
 
-                spin_lock(&med->med_open_lock);
+                cfs_spin_lock(&med->med_open_lock);
                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
                 if (mfd == NULL) {
-                        spin_unlock(&med->med_open_lock);
+                        cfs_spin_unlock(&med->med_open_lock);
                         CDEBUG(D_INODE, "no handle for file close: "
                                "fid = "DFID": cookie = "LPX64"\n",
                                PFID(info->mti_rr.rr_fid1),
@@ -409,8 +410,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
 
                 class_handle_unhash(&mfd->mfd_handle);
-                list_del_init(&mfd->mfd_list);
-                spin_unlock(&med->med_open_lock);
+                cfs_list_del_init(&mfd->mfd_list);
+                cfs_spin_unlock(&med->med_open_lock);
 
                 /* Close the found mfd, update attributes. */
                 ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
@@ -713,6 +714,37 @@ out_unlock_parent:
         mdt_object_unlock_put(info, mp, lhp, rc);
         return rc;
 }
+/**
+ * lock the part of the directory according to the hash of the name
+ * (lh->mlh_pdo_hash) in parallel directory lock.
+ */
+static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
+                       struct mdt_lock_handle *lh,
+                       struct mdt_object *obj, __u64 ibits)
+{
+        struct ldlm_res_id *res_id = &info->mti_res_id;
+        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+        ldlm_policy_data_t *policy = &info->mti_policy;
+        int rc;
+
+        /*
+         * Finish res_id initializing by name hash marking part of
+         * directory which is taking modification.
+         */
+        LASSERT(lh->mlh_pdo_hash != 0);
+        fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id);
+        memset(policy, 0, sizeof(*policy));
+        policy->l_inodebits.bits = ibits;
+        /*
+         * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
+         * going to be sent to client. If it is - mdt_intent_policy() path will
+         * fix it up and turn FL_LOCAL flag off.
+         */
+        rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
+                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+                          &info->mti_exp->exp_handle.h_cookie);
+        return rc;
+}
 
 /* partial operation for rename */
 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
@@ -940,6 +972,12 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
                 mdt_object_get(info->mti_env, msrcdir);
                 mtgtdir = msrcdir;
+                if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
+                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
+                                                 MDS_INODELOCK_UPDATE);
+                         if (rc)
+                                 GOTO(out_unlock_source, rc);
+                }
         } else {
                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
                                           rr->rr_fid2);