From fb92b63f136df0176818a9515cc2f729fb4ebe9a Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 3 Jun 2004 11:23:51 +0000 Subject: [PATCH] changes to protect directory splitting from concurrent modifies/lookups: - mds_fid2locked_dentry() takes UPDATE lock on the inode. depending on the given mode for LOOKUP lock new routine mds_lock_mode_for_dir() learns what mode to use for UPDATE lock. it could be: - LCK_CR - lookup case. it protects directory from concurrent splitting and don't invalidate client cache readdir() - LCK_CW - modify case. it protects directory from concurrent splitting and invalidate client cache for readdir() - LCK_EX - modify with possible splitting. protects from any parallel access - mds_getattr_name(), mds_open(), mds_reint_setattr(), mds_reint_create(), mds_get_parent_child_locked(), mds_reint_unlink(), mds_reint_rename() and mds_get_parents_children_lock() have been modified to play new game with directory locking - mds_splitting_expected() predicts splitting possibility --- lustre/include/linux/lustre_mds.h | 2 +- lustre/mds/handler.c | 81 +++++++++++++++++++++-------- lustre/mds/mds_internal.h | 5 +- lustre/mds/mds_lmv.c | 60 ++++++++++++++++------ lustre/mds/mds_open.c | 8 +-- lustre/mds/mds_reint.c | 105 ++++++++++++++++++++------------------ 6 files changed, 167 insertions(+), 94 deletions(-) diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 9799ca0..04a0745 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -165,7 +165,7 @@ int mds_reint_rec(struct mds_update_record *r, int offset, #ifdef __KERNEL__ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, - struct lustre_handle *lockh, + struct lustre_handle *lockh, int *pmode, char *name, int namelen, __u64 lockpart); struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index ec6cd9f..5ffc3f5 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -155,10 +155,50 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, return rc; } +int mds_lock_mode_for_dir(struct obd_device *obd, + struct dentry *dentry, int mode) +{ + int ret_mode; + + /* any dir access needs couple locks: + * 1) on part of dir we gonna lookup/modify in + * 2) on a whole dir to protect it from concurrent splitting + * and to flush client's cache for readdir() + * so, for a given mode and dentry this routine decides what + * lock mode to use for lock #2: + * 1) if caller's gonna lookup in dir then we need to protect + * dir from being splitted only - LCK_CR + * 2) if caller's gonna modify dir then we need to protect + * dir from being splitted and to flush cache - LCK_CW + * 3) if caller's gonna modify dir and that dir seems ready + * for splitting then we need to protect it from any + * type of access (lookup/modify/split) - LCK_EX -bzzz */ + + if (mode == LCK_PR) { + ret_mode = LCK_CR; + } else if (mode == LCK_PW) { + /* caller gonna modify directory.we use concurrent + write lock here to retract client's cache for readdir */ + ret_mode = LCK_CW; + if (mds_splitting_expected(obd, dentry)) { + /* splitting possible. serialize any access */ + CERROR("%s: gonna split %lu/%lu\n", + obd->obd_name, + (unsigned long) dentry->d_inode->i_ino, + (unsigned long) dentry->d_inode->i_generation); + ret_mode = LCK_EX; + } + } else { + CWARN("unexpected lock mode %d\n", mode); + ret_mode = LCK_EX; + } + return ret_mode; +} + /* only valid locked dentries or errors should be returned */ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, - struct lustre_handle *lockh, + struct lustre_handle *lockh, int *mode, char *name, int namelen, __u64 lockpart) { struct mds_obd *mds = &obd->u.mds; @@ -174,27 +214,27 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id.name[0] = de->d_inode->i_ino; res_id.name[1] = de->d_inode->i_generation; -#ifdef S_PDIROPS lockh[1].cookie = 0; +#ifdef S_PDIROPS if (name && IS_PDIROPS(de->d_inode)) { ldlm_policy_data_t cpolicy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; - /* lock just dir { ino, generation } to flush client cache */ - if (lock_mode == LCK_PW) { - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - res_id, LDLM_IBITS, - &cpolicy, LCK_CW, &flags, - mds_blocking_ast, - ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, lockh + 1); - if (rc != ELDLM_OK) { - l_dput(de); - RETURN(ERR_PTR(-ENOLCK)); - } - flags = 0; + LASSERT(mode != NULL); + *mode = mds_lock_mode_for_dir(obd, de, lock_mode); + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, + res_id, LDLM_IBITS, + &cpolicy, *mode, &flags, + mds_blocking_ast, + ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, lockh + 1); + if (rc != ELDLM_OK) { + l_dput(de); + RETURN(ERR_PTR(-ENOLCK)); } + flags = 0; res_id.name[2] = full_name_hash(name, namelen); + CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n", de->d_inode->i_ino, de->d_inode->i_generation, res_id.name[2]); @@ -763,7 +803,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, struct dentry *dparent = NULL, *dchild = NULL; struct lvfs_ucred uc; struct lustre_handle parent_lockh[2]; - int namesize; + int namesize, update_mode; int rc = 0, cleanup_phase = 0, resent_req = 0; char *name; ENTRY; @@ -811,7 +851,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, obd->obd_name, (unsigned long) body->fid1.id, (unsigned long) body->fid1.generation); dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR, - parent_lockh, NULL, 0, child_part); + parent_lockh, &update_mode, + NULL, 0, child_part); if (IS_ERR(dchild)) { CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild)); GOTO(cleanup, rc = PTR_ERR(dchild)); @@ -819,7 +860,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0])); #ifdef S_PDIROPS if (parent_lockh[1].cookie) - ldlm_lock_decref(parent_lockh + 1, LCK_CW); + ldlm_lock_decref(parent_lockh + 1, update_mode); #endif cleanup_phase = 2; goto fill_inode; @@ -857,7 +898,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, parent_lockh, &dparent, LCK_PR, MDS_INODELOCK_LOOKUP, - name, namesize, + &update_mode, name, namesize, child_lockh, &dchild, LCK_PR, child_part); if (rc) @@ -915,7 +956,7 @@ fill_inode: #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) ldlm_lock_decref(parent_lockh + 1, - LCK_CW); + update_mode); #endif } if (dparent) diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 67d8a9e..253ab59 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -42,7 +42,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, - __u64 parent_lockpart, + __u64 parent_lockpart, int *update_mode, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, @@ -107,6 +107,8 @@ extern struct lvfs_callback_ops mds_lvfs_ops; int mds_lov_clean(struct obd_device *obd); extern int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg); +extern int mds_lock_mode_for_dir(struct obd_device *, struct dentry *, int); + #ifdef __KERNEL__ int mds_get_md(struct obd_device *, struct inode *, void *md, int *size, int lock); @@ -125,5 +127,6 @@ int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **, int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *); int mds_choose_mdsnum(struct obd_device *, const char *, int, int); int mds_lmv_postsetup(struct obd_device *); +int mds_splitting_expected(struct obd_device *, struct dentry *); #endif /* _MDS_INTERNAL_H */ diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index a49bf89..54134df 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -411,6 +411,46 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, #define MAX_DIR_SIZE (64 * 1024) +int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry) +{ + struct mds_obd *mds = &obd->u.mds; + struct mea *mea = NULL; + int rc, size; + + /* clustered MD ? */ + if (!mds->mds_lmv_obd) + RETURN(0); + + /* inode exist? */ + if (dentry->d_inode == NULL) + return 0; + + /* a dir can be splitted only */ + if (!S_ISDIR(dentry->d_inode->i_mode)) + return 0; + + /* large enough to be splitted? */ + if (dentry->d_inode->i_size < MAX_DIR_SIZE) + return 0; + + /* don't split root directory */ + if (dentry->d_inode->i_ino == mds->mds_rootfid.id) + return 0; + + mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size); + if (mea) { + /* already splitted or slave object: shouldn't be splitted */ + rc = 0; + } else { + /* may be splitted */ + rc = 1; + } + + if (mea) + OBD_FREE(mea, size); + RETURN(rc); +} + /* * must not be called on already splitted directories */ @@ -425,22 +465,10 @@ int mds_try_to_split_dir(struct obd_device *obd, void *handle; ENTRY; - /* clustered MD ? */ - if (!mds->mds_lmv_obd) - RETURN(0); - - /* don't split root directory */ - if (dentry->d_inode->i_ino == mds->mds_rootfid.id) - RETURN(0); - - /* we want to split only large dirs. this may be already - * splitted dir or a slave dir created during splitting */ - if (dir->i_size < MAX_DIR_SIZE) - RETURN(0); - - /* check is directory marked non-splittable */ - if (mea && *mea) + /* TODO: optimization possible - we already may have mea here */ + if (!mds_splitting_expected(obd, dentry)) RETURN(0); + LASSERT(mea == NULL || *mea == NULL); CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n", obd->obd_name, mds->mds_num, dir->i_ino, @@ -459,8 +487,6 @@ int mds_try_to_split_dir(struct obd_device *obd, RETURN(-ENOMEM); (*mea)->mea_count = nstripes; -#warning "we have to take EX lock on a dir for splitting" - /* 1) create directory objects on slave MDS'es */ /* FIXME: should this be OBD method? */ oa = obdo_alloc(); diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 47471f2..709f67c 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -784,7 +784,7 @@ int mds_open(struct mds_update_record *rec, int offset, void *handle = NULL; struct dentry_params dp; struct mea *mea = NULL; - int mea_size; + int mea_size, update_mode; ENTRY; parent_lockh[0].cookie = 0; @@ -856,7 +856,7 @@ int mds_open(struct mds_update_record *rec, int offset, } dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode, - parent_lockh, rec->ur_name, + parent_lockh, &update_mode, rec->ur_name, rec->ur_namelen - 1, MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { @@ -922,7 +922,7 @@ got_child: NULL, 0, NULL, child_lockh); #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(parent_lockh + 1, LCK_CW); + ldlm_lock_decref(parent_lockh + 1, update_mode); #endif ldlm_lock_decref(parent_lockh, parent_mode); if (mea) @@ -1113,7 +1113,7 @@ got_child: l_dput(dparent); #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(parent_lockh + 1, LCK_CW); + ldlm_lock_decref(parent_lockh + 1, update_mode); #endif if (rc) ldlm_lock_decref(parent_lockh, parent_mode); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 57f7c99..fe82cf2 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -381,6 +381,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, struct dentry *de; struct inode *inode = NULL; struct lustre_handle lockh[2] = {{0}, {0}}; + int parent_mode; void *handle = NULL; struct mds_logcancel_data *mlcd = NULL; int rc = 0, cleanup_phase = 0, err, locked = 0; @@ -402,7 +403,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) ) lockpart |= MDS_INODELOCK_LOOKUP; de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, - lockh, NULL, 0, lockpart); + lockh, &parent_mode, NULL, 0, lockpart); if (IS_ERR(de)) GOTO(cleanup, rc = PTR_ERR(de)); locked = 1; @@ -502,7 +503,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, if (locked) { #ifdef S_PDIROPS if (lockh[1].cookie != 0) - ldlm_lock_decref(lockh + 1, LCK_CW); + ldlm_lock_decref(lockh + 1, parent_mode); #endif if (rc) { ldlm_lock_decref(lockh, LCK_PW); @@ -557,6 +558,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, struct inode *dir = NULL; void *handle = NULL; struct lustre_handle lockh[2] = {{0}, {0}}; + int parent_mode; int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0; int created = 0; struct dentry_params dp; @@ -577,8 +579,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, GOTO(cleanup, rc = -ESTALE); dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, lockh, - rec->ur_name, rec->ur_namelen - 1, - MDS_INODELOCK_UPDATE); + &parent_mode, rec->ur_name, + rec->ur_namelen - 1, MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { rc = PTR_ERR(dparent); CERROR("parent lookup error %d\n", rc); @@ -858,7 +860,7 @@ cleanup: case 1: /* locked parent dentry */ #ifdef S_PDIROPS if (lockh[1].cookie != 0) - ldlm_lock_decref(lockh + 1, LCK_CW); + ldlm_lock_decref(lockh + 1, parent_mode); #endif if (rc) { ldlm_lock_decref(lockh, LCK_PW); @@ -1185,7 +1187,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, - __u64 parent_lockpart, + __u64 parent_lockpart, int *update_mode, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, @@ -1215,23 +1217,19 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, #ifdef S_PDIROPS parent_lockh[1].cookie = 0; if (name && IS_PDIROPS((*dparentp)->d_inode)) { - /* lock just dir { ino, generation } to flush client cache */ - if (parent_mode == LCK_PW) { - struct ldlm_res_id res_id = { .name = {0} }; - ldlm_policy_data_t policy; - int flags = 0; - res_id.name[0] = (*dparentp)->d_inode->i_ino; - res_id.name[1] = (*dparentp)->d_inode->i_generation; - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - res_id, LDLM_IBITS, - &policy, LCK_CW, &flags, - mds_blocking_ast, - ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, parent_lockh+1); - if (rc != ELDLM_OK) - RETURN(-ENOLCK); - } + struct ldlm_res_id res_id = { .name = {0} }; + ldlm_policy_data_t policy; + int flags = 0; + *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode); + res_id.name[0] = (*dparentp)->d_inode->i_ino; + res_id.name[1] = (*dparentp)->d_inode->i_generation; + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, + LDLM_IBITS, &policy, *update_mode, &flags, + mds_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, parent_lockh+1); + if (rc != ELDLM_OK) + RETURN(-ENOLCK); parent_res_id.name[2] = full_name_hash(name, namelen - 1); CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n", @@ -1312,7 +1310,7 @@ cleanup: case 1: #ifdef S_PDIROPS if (parent_lockh[1].cookie) - ldlm_lock_decref(parent_lockh + 1, LCK_CW); + ldlm_lock_decref(parent_lockh + 1, *update_mode); #endif l_dput(*dparentp); default: ; @@ -1371,7 +1369,8 @@ int mds_create_local_dentry(struct mds_update_record *rec, d_drop(new_child); child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, - lockh, NULL, 0, MDS_INODELOCK_UPDATE); + lockh, NULL, NULL, 0, + MDS_INODELOCK_UPDATE); if (IS_ERR(child)) { CERROR("can't get victim\n"); GOTO(cleanup, rc = PTR_ERR(child)); @@ -1475,6 +1474,7 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master, static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *parent_lockh, + int update_mode, struct dentry *dparent, struct lustre_handle *child_lockh, struct dentry *dchild) @@ -1510,7 +1510,7 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset, #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(parent_lockh + 1, LCK_CW); + ldlm_lock_decref(parent_lockh + 1, update_mode); #endif ldlm_lock_decref(child_lockh, LCK_EX); if (rc) @@ -1538,6 +1538,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, void *handle = NULL; int rc = 0, log_unlink = 0, cleanup_phase = 0; int unlink_by_fid = 0; + int update_mode; ENTRY; LASSERT(offset == 0 || offset == 2); @@ -1559,7 +1560,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, } rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, parent_lockh, &dparent, LCK_PW, - MDS_INODELOCK_UPDATE, + MDS_INODELOCK_UPDATE, &update_mode, rec->ur_name, rec->ur_namelen, &child_lockh, &dchild, LCK_EX, MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE); @@ -1571,7 +1572,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, LASSERT(unlink_by_fid == 0); LASSERT(dchild->d_mdsnum != mds->mds_num); mds_reint_unlink_remote(rec, offset, req, parent_lockh, - dparent, &child_lockh, dchild); + update_mode, dparent, &child_lockh, dchild); RETURN(0); } @@ -1735,7 +1736,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, case 1: /* child and parent dentry, parent lock */ #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(parent_lockh + 1, LCK_CW); + ldlm_lock_decref(parent_lockh + 1, update_mode); #endif if (rc) ldlm_lock_decref(parent_lockh, LCK_PW); @@ -1843,6 +1844,7 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec, int rc = 0, cleanup_phase = 0; struct mdc_op_data op_data; struct ptlrpc_request *request = NULL; + int update_mode; ENTRY; #define fmt "%s: request to link %u/%u/%u:%*s to foreign inode %u/%u/%u\n" @@ -1856,8 +1858,8 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec, (unsigned)rec->ur_fid1->generation); de_tgt_dir = mds_fid2locked_dentry(obd, rec->ur_fid2, NULL, LCK_EX, - tgt_dir_lockh, rec->ur_name, - rec->ur_namelen - 1, + tgt_dir_lockh, &update_mode, + rec->ur_name, rec->ur_namelen - 1, MDS_INODELOCK_UPDATE); if (IS_ERR(de_tgt_dir)) GOTO(cleanup, rc = PTR_ERR(de_tgt_dir)); @@ -1903,12 +1905,12 @@ cleanup: if (rc) { ldlm_lock_decref(tgt_dir_lockh, LCK_EX); #ifdef S_PDIROPS - ldlm_lock_decref(tgt_dir_lockh + 1, LCK_CW); + ldlm_lock_decref(tgt_dir_lockh + 1, update_mode); #endif } else { ptlrpc_save_lock(req, tgt_dir_lockh, LCK_EX); #ifdef S_PDIROPS - ptlrpc_save_lock(req, tgt_dir_lockh + 1, LCK_CW); + ptlrpc_save_lock(req, tgt_dir_lockh+1, update_mode); #endif } l_dput(de_tgt_dir); @@ -1937,8 +1939,8 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}}; ldlm_policy_data_t tgt_dir_policy = {.l_inodebits = {MDS_INODELOCK_UPDATE}}; - int rc = 0, cleanup_phase = 0; + int update_mode = 0; ENTRY; LASSERT(offset == 0); @@ -1991,10 +1993,10 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, #ifdef S_PDIROPS if (IS_PDIROPS(de_tgt_dir->d_inode)) { int flags = 0; - /* Get a temp lock on just ino, gen to flush client cache */ + update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, tgt_dir_res_id, LDLM_IBITS, &src_policy, - LCK_CW, &flags, mds_blocking_ast, + update_mode, &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, tgt_dir_lockh + 1); if (rc != ELDLM_OK) @@ -2065,8 +2067,8 @@ cleanup: } case 2: /* target dentry */ #ifdef S_PDIROPS - if (tgt_dir_lockh[1].cookie) - ldlm_lock_decref(tgt_dir_lockh + 1, LCK_CW); + if (tgt_dir_lockh[1].cookie && update_mode) + ldlm_lock_decref(tgt_dir_lockh + 1, update_mode); #endif if (de_tgt_dir) l_dput(de_tgt_dir); @@ -2216,10 +2218,11 @@ static int mds_get_parents_children_locked(struct obd_device *obd, dlm_handles[5].cookie = 0; dlm_handles[6].cookie = 0; if (IS_PDIROPS((*de_srcdirp)->d_inode)) { - /* Get a temp lock on just ino, gen to flush client cache */ + /* Get a temp lock on just ino, gen to flush client cache and + * to protect dirs from concurrent splitting */ rc = enqueue_ordered_locks(obd, &p1_res_id, &(dlm_handles[5]), - LCK_CW, &p_policy, &p2_res_id, - &(dlm_handles[6]),LCK_CW,&p_policy); + LCK_PW, &p_policy, &p2_res_id, + &(dlm_handles[6]),LCK_PW,&p_policy); if (rc != ELDLM_OK) GOTO(cleanup, rc); @@ -2376,7 +2379,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec, struct lustre_handle child_lockh = {0}; int cleanup_phase = 0; void *handle = NULL; - int rc = 0; + int update_mode, rc = 0; ENTRY; /* another MDS executing rename operation has asked us @@ -2393,7 +2396,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec, child_lockh.cookie = 0; rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid2, parent_lockh, &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE, - rec->ur_tgt, rec->ur_tgtlen, + &update_mode, rec->ur_tgt, rec->ur_tgtlen, &child_lockh, &de_new, LCK_EX, MDS_INODELOCK_LOOKUP); if (rc) @@ -2449,7 +2452,7 @@ cleanup: case 1: #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(&parent_lockh[1], LCK_CW); + ldlm_lock_decref(&parent_lockh[1], update_mode); #endif ldlm_lock_decref(&parent_lockh[0], LCK_PW); if (child_lockh.cookie != 0) @@ -2478,7 +2481,7 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset, struct lustre_handle child_lockh = {0}; struct mdc_op_data opdata; void *handle = NULL; - int rc = 0; + int update_mode, rc = 0; ENTRY; CDEBUG(D_OTHER, "%s: move name %s onto another mds%u\n", @@ -2488,9 +2491,9 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset, child_lockh.cookie = 0; rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, parent_lockh, &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE, - rec->ur_name, rec->ur_namelen, - &child_lockh, &de_old, LCK_EX, - MDS_INODELOCK_LOOKUP); + &update_mode, rec->ur_name, + rec->ur_namelen, &child_lockh, &de_old, + LCK_EX, MDS_INODELOCK_LOOKUP); LASSERT(rc == 0); LASSERT(de_srcdir); LASSERT(de_srcdir->d_inode); @@ -2539,7 +2542,7 @@ cleanup: #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(&parent_lockh[1], LCK_CW); + ldlm_lock_decref(&parent_lockh[1], update_mode); #endif ldlm_lock_decref(&parent_lockh[0], LCK_PW); if (child_lockh.cookie != 0) @@ -2669,9 +2672,9 @@ cleanup: case 1: #ifdef S_PDIROPS if (dlm_handles[5].cookie != 0) - ldlm_lock_decref(&(dlm_handles[5]), LCK_CW); + ldlm_lock_decref(&(dlm_handles[5]), LCK_PW); if (dlm_handles[6].cookie != 0) - ldlm_lock_decref(&(dlm_handles[6]), LCK_CW); + ldlm_lock_decref(&(dlm_handles[6]), LCK_PW); #endif if (rc) { if (lock_count == 4) -- 1.8.3.1