Whamcloud - gitweb
changes to protect directory splitting from concurrent modifies/lookups:
authoralex <alex>
Thu, 3 Jun 2004 11:23:51 +0000 (11:23 +0000)
committeralex <alex>
Thu, 3 Jun 2004 11:23:51 +0000 (11:23 +0000)
 - mds_fid2locked_dentry() takes UPDATE lock on the inode. depending on the
   given mode for LOOKUP lock new routine mds_lock_mode_for_dir() learns
   what mode to use for UPDATE lock. it could be:
   - LCK_CR - lookup case. it protects directory from concurrent splitting
     and don't invalidate client cache readdir()
   - LCK_CW - modify case. it protects directory from concurrent splitting
     and invalidate client cache for readdir()
   - LCK_EX - modify with possible splitting. protects from any parallel access

 - mds_getattr_name(), mds_open(), mds_reint_setattr(), mds_reint_create(),
   mds_get_parent_child_locked(), mds_reint_unlink(), mds_reint_rename() and
   mds_get_parents_children_lock() have been modified to play new game with
   directory locking

 - mds_splitting_expected() predicts splitting possibility

lustre/include/linux/lustre_mds.h
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lmv.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c

index 9799ca0..04a0745 100644 (file)
@@ -165,7 +165,7 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 #ifdef __KERNEL__
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
-                                     struct lustre_handle *lockh,
+                                     struct lustre_handle *lockh, int *pmode,
                                      char *name, int namelen, __u64 lockpart);
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt);
index ec6cd9f..5ffc3f5 100644 (file)
@@ -155,10 +155,50 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         return rc;
 }
 
+int mds_lock_mode_for_dir(struct obd_device *obd,
+                              struct dentry *dentry, int mode)
+{
+        int ret_mode;
+
+        /* any dir access needs couple locks:
+         * 1) on part of dir we gonna lookup/modify in
+         * 2) on a whole dir to protect it from concurrent splitting
+         *    and to flush client's cache for readdir()
+         * so, for a given mode and dentry this routine decides what
+         * lock mode to use for lock #2:
+         * 1) if caller's gonna lookup in dir then we need to protect
+         *    dir from being splitted only - LCK_CR
+         * 2) if caller's gonna modify dir then we need to protect
+         *    dir from being splitted and to flush cache - LCK_CW
+         * 3) if caller's gonna modify dir and that dir seems ready
+         *    for splitting then we need to protect it from any
+         *    type of access (lookup/modify/split) - LCK_EX -bzzz */
+
+        if (mode == LCK_PR) {
+                ret_mode = LCK_CR;
+        } else if (mode == LCK_PW) {
+                /* caller gonna modify directory.we use concurrent
+                   write lock here to retract client's cache for readdir */
+                ret_mode = LCK_CW;
+                if (mds_splitting_expected(obd, dentry)) {
+                        /* splitting possible. serialize any access */
+                        CERROR("%s: gonna split %lu/%lu\n",
+                               obd->obd_name,
+                               (unsigned long) dentry->d_inode->i_ino,
+                               (unsigned long) dentry->d_inode->i_generation);
+                        ret_mode = LCK_EX;
+                }
+        } else {
+                CWARN("unexpected lock mode %d\n", mode);
+                ret_mode = LCK_EX;
+        }
+        return ret_mode;
+}
+
 /* only valid locked dentries or errors should be returned */
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
-                                     struct lustre_handle *lockh,
+                                     struct lustre_handle *lockh, int *mode,
                                      char *name, int namelen, __u64 lockpart)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -174,27 +214,27 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
 
         res_id.name[0] = de->d_inode->i_ino;
         res_id.name[1] = de->d_inode->i_generation;
-#ifdef S_PDIROPS
         lockh[1].cookie = 0;
+#ifdef S_PDIROPS
         if (name && IS_PDIROPS(de->d_inode)) {
                 ldlm_policy_data_t cpolicy =
                         { .l_inodebits = { MDS_INODELOCK_UPDATE } };
-                /* lock just dir { ino, generation } to flush client cache */
-                if (lock_mode == LCK_PW) {
-                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                              res_id, LDLM_IBITS,
-                                              &cpolicy, LCK_CW, &flags,
-                                              mds_blocking_ast,
-                                              ldlm_completion_ast, NULL, NULL,
-                                              NULL, 0, NULL, lockh + 1);
-                        if (rc != ELDLM_OK) {
-                                l_dput(de);
-                                RETURN(ERR_PTR(-ENOLCK));
-                        }
-                       flags = 0;
+                LASSERT(mode != NULL);
+                *mode = mds_lock_mode_for_dir(obd, de, lock_mode);
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                res_id, LDLM_IBITS,
+                                &cpolicy, *mode, &flags,
+                                mds_blocking_ast,
+                                ldlm_completion_ast, NULL, NULL,
+                                NULL, 0, NULL, lockh + 1);
+                if (rc != ELDLM_OK) {
+                        l_dput(de);
+                        RETURN(ERR_PTR(-ENOLCK));
                 }
+                flags = 0;
 
                 res_id.name[2] = full_name_hash(name, namelen);
+
                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
                        de->d_inode->i_ino, de->d_inode->i_generation,
                        res_id.name[2]);
@@ -763,7 +803,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         struct dentry *dparent = NULL, *dchild = NULL;
         struct lvfs_ucred uc;
         struct lustre_handle parent_lockh[2];
-        int namesize;
+        int namesize, update_mode;
         int rc = 0, cleanup_phase = 0, resent_req = 0;
         char *name;
         ENTRY;
@@ -811,7 +851,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                        obd->obd_name, (unsigned long) body->fid1.id,
                        (unsigned long) body->fid1.generation);
                 dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
-                                               parent_lockh, NULL, 0, child_part);
+                                               parent_lockh, &update_mode, 
+                                               NULL, 0, child_part);
                 if (IS_ERR(dchild)) {
                         CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
                         GOTO(cleanup, rc = PTR_ERR(dchild));
@@ -819,7 +860,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                 memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie)
-                        ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                        ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
                 cleanup_phase = 2;
                 goto fill_inode;
@@ -857,7 +898,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
                                                  parent_lockh, &dparent,
                                                  LCK_PR, MDS_INODELOCK_LOOKUP,
-                                                 name, namesize,
+                                                 &update_mode, name, namesize,
                                                  child_lockh, &dchild, LCK_PR,
                                                  child_part);
                 if (rc)
@@ -915,7 +956,7 @@ fill_inode:
 #ifdef S_PDIROPS
                                 if (parent_lockh[1].cookie != 0)
                                         ldlm_lock_decref(parent_lockh + 1,
-                                                        LCK_CW);
+                                                         update_mode);
 #endif
                         }
                         if (dparent)
index 67d8a9e..253ab59 100644 (file)
@@ -42,7 +42,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
-                                __u64 parent_lockpart,
+                                __u64 parent_lockpart, int *update_mode,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
@@ -107,6 +107,8 @@ extern struct lvfs_callback_ops mds_lvfs_ops;
 int mds_lov_clean(struct obd_device *obd);
 extern int mds_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg);
+extern int mds_lock_mode_for_dir(struct obd_device *, struct dentry *, int);
+
 #ifdef __KERNEL__
 int mds_get_md(struct obd_device *, struct inode *, void *md, int *size, 
                int lock);
@@ -125,5 +127,6 @@ int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **,
 int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *);
 int mds_choose_mdsnum(struct obd_device *, const char *, int, int);
 int mds_lmv_postsetup(struct obd_device *);
+int mds_splitting_expected(struct obd_device *, struct dentry *);
 
 #endif /* _MDS_INTERNAL_H */
index a49bf89..54134df 100644 (file)
@@ -411,6 +411,46 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
 
 #define MAX_DIR_SIZE    (64 * 1024)
 
+int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct mea *mea = NULL;
+        int rc, size;
+
+       /* clustered MD ? */
+       if (!mds->mds_lmv_obd)
+               RETURN(0);
+
+        /* inode exist? */
+        if (dentry->d_inode == NULL)
+                return 0;
+
+        /* a dir can be splitted only */
+        if (!S_ISDIR(dentry->d_inode->i_mode))
+                return 0;
+
+        /* large enough to be splitted? */
+        if (dentry->d_inode->i_size < MAX_DIR_SIZE)
+                return 0;
+
+        /* don't split root directory */
+        if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
+                return 0;
+
+        mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
+        if (mea) {
+                /* already splitted or slave object: shouldn't be splitted */
+                rc = 0;
+        } else {
+                /* may be splitted */
+                rc = 1;
+        }
+
+        if (mea)
+                OBD_FREE(mea, size);
+        RETURN(rc);
+}
+
 /*
  * must not be called on already splitted directories
  */
@@ -425,22 +465,10 @@ int mds_try_to_split_dir(struct obd_device *obd,
        void *handle;
        ENTRY;
 
-       /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
-               RETURN(0);
-
-        /* don't split root directory */
-        if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
-                RETURN(0);
-
-        /* we want to split only large dirs. this may be already
-         * splitted dir or a slave dir created during splitting */
-        if (dir->i_size < MAX_DIR_SIZE)
-                RETURN(0);
-
-        /* check is directory marked non-splittable */
-        if (mea && *mea)
+        /* TODO: optimization possible - we already may have mea here */
+        if (!mds_splitting_expected(obd, dentry))
                 RETURN(0);
+        LASSERT(mea == NULL || *mea == NULL);
 
         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
                obd->obd_name, mds->mds_num, dir->i_ino,
@@ -459,8 +487,6 @@ int mds_try_to_split_dir(struct obd_device *obd,
                 RETURN(-ENOMEM);
         (*mea)->mea_count = nstripes;
        
-#warning "we have to take EX lock on a dir for splitting"
-        
        /* 1) create directory objects on slave MDS'es */
        /* FIXME: should this be OBD method? */
         oa = obdo_alloc();
index 47471f2..709f67c 100644 (file)
@@ -784,7 +784,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         void *handle = NULL;
         struct dentry_params dp;
         struct mea *mea = NULL;
-        int mea_size;
+        int mea_size, update_mode;
         ENTRY;
 
         parent_lockh[0].cookie = 0;
@@ -856,7 +856,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         }
         
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
-                                        parent_lockh, rec->ur_name,
+                                        parent_lockh, &update_mode, rec->ur_name,
                                         rec->ur_namelen - 1,
                                         MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
@@ -922,7 +922,7 @@ got_child:
                                       NULL, 0, NULL, child_lockh);
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie != 0)
-                        ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                        ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
                 ldlm_lock_decref(parent_lockh, parent_mode);
                 if (mea)
@@ -1113,7 +1113,7 @@ got_child:
                 l_dput(dparent);
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie != 0)
-                        ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                        ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
                 if (rc)
                         ldlm_lock_decref(parent_lockh, parent_mode);
index 57f7c99..fe82cf2 100644 (file)
@@ -381,6 +381,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         struct dentry *de;
         struct inode *inode = NULL;
         struct lustre_handle lockh[2] = {{0}, {0}};
+        int parent_mode;
         void *handle = NULL;
         struct mds_logcancel_data *mlcd = NULL;
         int rc = 0, cleanup_phase = 0, err, locked = 0;
@@ -402,7 +403,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) )
                         lockpart |= MDS_INODELOCK_LOOKUP;
                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                           lockh, NULL, 0, lockpart);
+                                           lockh, &parent_mode, NULL, 0, lockpart);
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
                 locked = 1;
@@ -502,7 +503,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (locked) {
 #ifdef S_PDIROPS
                         if (lockh[1].cookie != 0)
-                                ldlm_lock_decref(lockh + 1, LCK_CW);
+                                ldlm_lock_decref(lockh + 1, parent_mode);
 #endif
                         if (rc) {
                                 ldlm_lock_decref(lockh, LCK_PW);
@@ -557,6 +558,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         struct inode *dir = NULL;
         void *handle = NULL;
         struct lustre_handle lockh[2] = {{0}, {0}};
+        int parent_mode;
         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
         int created = 0;
         struct dentry_params dp;
@@ -577,8 +579,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -ESTALE);
 
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, lockh,
-                                        rec->ur_name, rec->ur_namelen - 1,
-                                        MDS_INODELOCK_UPDATE);
+                                        &parent_mode, rec->ur_name,
+                                        rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 CERROR("parent lookup error %d\n", rc);
@@ -858,7 +860,7 @@ cleanup:
         case 1: /* locked parent dentry */
 #ifdef S_PDIROPS
                 if (lockh[1].cookie != 0)
-                        ldlm_lock_decref(lockh + 1, LCK_CW);
+                        ldlm_lock_decref(lockh + 1, parent_mode);
 #endif
                 if (rc) {
                         ldlm_lock_decref(lockh, LCK_PW);
@@ -1185,7 +1187,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
-                                __u64 parent_lockpart,
+                                __u64 parent_lockpart, int *update_mode,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
@@ -1215,23 +1217,19 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 #ifdef S_PDIROPS
         parent_lockh[1].cookie = 0;
         if (name && IS_PDIROPS((*dparentp)->d_inode)) {
-                /* lock just dir { ino, generation } to flush client cache */
-                if (parent_mode == LCK_PW) {
-                        struct ldlm_res_id res_id = { .name = {0} };
-                        ldlm_policy_data_t policy;
-                        int flags = 0;
-                        res_id.name[0] = (*dparentp)->d_inode->i_ino;
-                        res_id.name[1] = (*dparentp)->d_inode->i_generation;
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                              res_id, LDLM_IBITS,
-                                              &policy, LCK_CW, &flags,
-                                              mds_blocking_ast,
-                                              ldlm_completion_ast, NULL, NULL,
-                                              NULL, 0, NULL, parent_lockh+1);
-                        if (rc != ELDLM_OK)
-                                RETURN(-ENOLCK);
-                }
+                struct ldlm_res_id res_id = { .name = {0} };
+                ldlm_policy_data_t policy;
+                int flags = 0;
+                *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
+                res_id.name[0] = (*dparentp)->d_inode->i_ino;
+                res_id.name[1] = (*dparentp)->d_inode->i_generation;
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+                                      LDLM_IBITS, &policy, *update_mode, &flags,
+                                      mds_blocking_ast, ldlm_completion_ast,
+                                      NULL, NULL, NULL, 0, NULL, parent_lockh+1);
+                if (rc != ELDLM_OK)
+                        RETURN(-ENOLCK);
 
                 parent_res_id.name[2] = full_name_hash(name, namelen - 1);
                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
@@ -1312,7 +1310,7 @@ cleanup:
                 case 1:
 #ifdef S_PDIROPS
                         if (parent_lockh[1].cookie)
-                                ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                                ldlm_lock_decref(parent_lockh + 1, *update_mode);
 #endif
                         l_dput(*dparentp);
                 default: ;
@@ -1371,7 +1369,8 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         d_drop(new_child);
 
         child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX,
-                                      lockh, NULL, 0, MDS_INODELOCK_UPDATE);
+                                      lockh, NULL, NULL, 0,
+                                      MDS_INODELOCK_UPDATE);
         if (IS_ERR(child)) {
                 CERROR("can't get victim\n");
                 GOTO(cleanup, rc = PTR_ERR(child));
@@ -1475,6 +1474,7 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
 static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
                                    struct ptlrpc_request *req,
                                    struct lustre_handle *parent_lockh,
+                                   int update_mode,
                                    struct dentry *dparent,
                                    struct lustre_handle *child_lockh,
                                    struct dentry *dchild)
@@ -1510,7 +1510,7 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
 
 #ifdef S_PDIROPS
         if (parent_lockh[1].cookie != 0)
-                ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
         ldlm_lock_decref(child_lockh, LCK_EX);
         if (rc)
@@ -1538,6 +1538,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         void *handle = NULL;
         int rc = 0, log_unlink = 0, cleanup_phase = 0;
         int unlink_by_fid = 0;
+        int update_mode;
         ENTRY;
 
         LASSERT(offset == 0 || offset == 2);
@@ -1559,7 +1560,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         }
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
                                          parent_lockh, &dparent, LCK_PW,
-                                         MDS_INODELOCK_UPDATE,
+                                         MDS_INODELOCK_UPDATE, &update_mode,
                                          rec->ur_name, rec->ur_namelen,
                                          &child_lockh, &dchild, LCK_EX,
                                          MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE);
@@ -1571,7 +1572,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LASSERT(unlink_by_fid == 0);
                 LASSERT(dchild->d_mdsnum != mds->mds_num);
                 mds_reint_unlink_remote(rec, offset, req, parent_lockh,
-                                             dparent, &child_lockh, dchild);
+                                        update_mode, dparent, &child_lockh, dchild);
                 RETURN(0);
         }
 
@@ -1735,7 +1736,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         case 1: /* child and parent dentry, parent lock */
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie != 0)
-                        ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                        ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
                 if (rc)
                         ldlm_lock_decref(parent_lockh, LCK_PW);
@@ -1843,6 +1844,7 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
         int rc = 0, cleanup_phase = 0;
         struct mdc_op_data op_data;
         struct ptlrpc_request *request = NULL;
+        int update_mode;
         ENTRY;
 
 #define fmt     "%s: request to link %u/%u/%u:%*s to foreign inode %u/%u/%u\n"
@@ -1856,8 +1858,8 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
                   (unsigned)rec->ur_fid1->generation);
 
         de_tgt_dir = mds_fid2locked_dentry(obd, rec->ur_fid2, NULL, LCK_EX,
-                                           tgt_dir_lockh, rec->ur_name,
-                                           rec->ur_namelen - 1,
+                                           tgt_dir_lockh, &update_mode,
+                                           rec->ur_name, rec->ur_namelen - 1,
                                            MDS_INODELOCK_UPDATE);
         if (IS_ERR(de_tgt_dir))
                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
@@ -1903,12 +1905,12 @@ cleanup:
                         if (rc) {
                                 ldlm_lock_decref(tgt_dir_lockh, LCK_EX);
 #ifdef S_PDIROPS
-                                ldlm_lock_decref(tgt_dir_lockh + 1, LCK_CW);
+                                ldlm_lock_decref(tgt_dir_lockh + 1, update_mode);
 #endif
                         } else {
                                 ptlrpc_save_lock(req, tgt_dir_lockh, LCK_EX);
 #ifdef S_PDIROPS
-                                ptlrpc_save_lock(req, tgt_dir_lockh + 1, LCK_CW);
+                                ptlrpc_save_lock(req, tgt_dir_lockh+1, update_mode);
 #endif
                         }
                         l_dput(de_tgt_dir);
@@ -1937,8 +1939,8 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}};
         ldlm_policy_data_t tgt_dir_policy =
                                        {.l_inodebits = {MDS_INODELOCK_UPDATE}};
-
         int rc = 0, cleanup_phase = 0;
+        int update_mode = 0;
         ENTRY;
 
         LASSERT(offset == 0);
@@ -1991,10 +1993,10 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
 #ifdef S_PDIROPS
         if (IS_PDIROPS(de_tgt_dir->d_inode)) {
                 int flags = 0;
-                /* Get a temp lock on just ino, gen to flush client cache */
+                update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                       tgt_dir_res_id, LDLM_IBITS, &src_policy,
-                                      LCK_CW, &flags, mds_blocking_ast,
+                                      update_mode, &flags, mds_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL,
                                       NULL, 0, NULL, tgt_dir_lockh + 1);
                 if (rc != ELDLM_OK)
@@ -2065,8 +2067,8 @@ cleanup:
                 }
         case 2: /* target dentry */
 #ifdef S_PDIROPS
-                if (tgt_dir_lockh[1].cookie)
-                        ldlm_lock_decref(tgt_dir_lockh + 1, LCK_CW);
+                if (tgt_dir_lockh[1].cookie && update_mode)
+                        ldlm_lock_decref(tgt_dir_lockh + 1, update_mode);
 #endif
                 if (de_tgt_dir)
                         l_dput(de_tgt_dir);
@@ -2216,10 +2218,11 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         dlm_handles[5].cookie = 0;
         dlm_handles[6].cookie = 0;
         if (IS_PDIROPS((*de_srcdirp)->d_inode)) {
-                /* Get a temp lock on just ino, gen to flush client cache */
+                /* Get a temp lock on just ino, gen to flush client cache and
+                 * to protect dirs from concurrent splitting */
                 rc = enqueue_ordered_locks(obd, &p1_res_id, &(dlm_handles[5]),
-                                           LCK_CW, &p_policy, &p2_res_id,
-                                           &(dlm_handles[6]),LCK_CW,&p_policy);
+                                           LCK_PW, &p_policy, &p2_res_id,
+                                           &(dlm_handles[6]),LCK_PW,&p_policy);
                 if (rc != ELDLM_OK)
                         GOTO(cleanup, rc);
 
@@ -2376,7 +2379,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
         struct lustre_handle child_lockh = {0};
         int cleanup_phase = 0;
         void *handle = NULL;
-        int rc = 0;
+        int update_mode, rc = 0;
         ENTRY;
 
         /* another MDS executing rename operation has asked us
@@ -2393,7 +2396,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
         child_lockh.cookie = 0;
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid2, parent_lockh,
                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
-                                         rec->ur_tgt, rec->ur_tgtlen,
+                                         &update_mode, rec->ur_tgt, rec->ur_tgtlen,
                                          &child_lockh, &de_new, LCK_EX,
                                          MDS_INODELOCK_LOOKUP);
         if (rc)
@@ -2449,7 +2452,7 @@ cleanup:
                 case 1:
 #ifdef S_PDIROPS
                         if (parent_lockh[1].cookie != 0)
-                                ldlm_lock_decref(&parent_lockh[1], LCK_CW);
+                                ldlm_lock_decref(&parent_lockh[1], update_mode);
 #endif
                         ldlm_lock_decref(&parent_lockh[0], LCK_PW);
                         if (child_lockh.cookie != 0)
@@ -2478,7 +2481,7 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         struct lustre_handle child_lockh = {0};
         struct mdc_op_data opdata;
         void *handle = NULL;
-        int rc = 0;
+        int update_mode, rc = 0;
         ENTRY;
 
         CDEBUG(D_OTHER, "%s: move name %s onto another mds%u\n",
@@ -2488,9 +2491,9 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         child_lockh.cookie = 0;
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, parent_lockh,
                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
-                                         rec->ur_name, rec->ur_namelen,
-                                         &child_lockh, &de_old, LCK_EX,
-                                         MDS_INODELOCK_LOOKUP);
+                                         &update_mode, rec->ur_name, 
+                                         rec->ur_namelen, &child_lockh, &de_old,
+                                         LCK_EX, MDS_INODELOCK_LOOKUP);
         LASSERT(rc == 0);
         LASSERT(de_srcdir);
         LASSERT(de_srcdir->d_inode);
@@ -2539,7 +2542,7 @@ cleanup:
 
 #ifdef S_PDIROPS
         if (parent_lockh[1].cookie != 0)
-                ldlm_lock_decref(&parent_lockh[1], LCK_CW);
+                ldlm_lock_decref(&parent_lockh[1], update_mode);
 #endif
         ldlm_lock_decref(&parent_lockh[0], LCK_PW);
         if (child_lockh.cookie != 0)
@@ -2669,9 +2672,9 @@ cleanup:
         case 1:
 #ifdef S_PDIROPS
                 if (dlm_handles[5].cookie != 0)
-                        ldlm_lock_decref(&(dlm_handles[5]), LCK_CW);
+                        ldlm_lock_decref(&(dlm_handles[5]), LCK_PW);
                 if (dlm_handles[6].cookie != 0)
-                        ldlm_lock_decref(&(dlm_handles[6]), LCK_CW);
+                        ldlm_lock_decref(&(dlm_handles[6]), LCK_PW);
 #endif
                 if (rc) {
                         if (lock_count == 4)