int mds_lock_mode_for_dir(struct obd_device *obd,
struct dentry *dentry, int mode)
{
- int ret_mode;
+ int ret_mode, split;
/* any dir access needs couple locks:
* 1) on part of dir we gonna lookup/modify in
* for splitting then we need to protect it from any
* type of access (lookup/modify/split) - LCK_EX -bzzz */
- if (mode == LCK_PR) {
- ret_mode = LCK_CR;
- } else if (mode == LCK_PW) {
- /* caller gonna modify directory.we use concurrent
- write lock here to retract client's cache for readdir */
- ret_mode = LCK_CW;
- if (mds_splitting_expected(obd, dentry)) {
- /* splitting possible. serialize any access */
- CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n",
- obd->obd_name,
- (unsigned long) dentry->d_inode->i_ino,
- (unsigned long) dentry->d_inode->i_generation);
+ split = mds_splitting_expected(obd, dentry);
+ if (split == MDS_NO_SPLITTABLE) {
+ /* this inode won't be splitted. so we need not to protect from
+ * just flush client's cache on modification */
+ ret_mode = 0;
+ if (mode == LCK_PW)
+ ret_mode = LCK_CW;
+ } else {
+ if (mode == LCK_PR) {
+ ret_mode = LCK_CR;
+ } else if (mode == LCK_PW) {
+ /* caller gonna modify directory.we use concurrent
+ write lock here to retract client's cache for readdir */
+ ret_mode = LCK_CW;
+ if (split == MDS_EXPECT_SPLIT) {
+ /* splitting possible. serialize any access */
+ CDEBUG(D_OTHER, "%s: gonna split %u/%u\n",
+ obd->obd_name,
+ (unsigned) dentry->d_inode->i_ino,
+ (unsigned) dentry->d_inode->i_generation);
+ ret_mode = LCK_EX;
+ }
+ } else {
+ CWARN("unexpected lock mode %d\n", mode);
ret_mode = LCK_EX;
}
- } else {
- CWARN("unexpected lock mode %d\n", mode);
- ret_mode = LCK_EX;
}
return ret_mode;
}
{ .l_inodebits = { MDS_INODELOCK_UPDATE } };
LASSERT(mode != NULL);
*mode = mds_lock_mode_for_dir(obd, de, lock_mode);
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- res_id, LDLM_IBITS,
- &cpolicy, *mode, &flags,
- mds_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, lockh + 1);
- if (rc != ELDLM_OK) {
- l_dput(de);
- RETURN(ERR_PTR(-ENOLCK));
+ if (*mode) {
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS,
+ &cpolicy, *mode, &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, lockh + 1);
+ if (rc != ELDLM_OK) {
+ l_dput(de);
+ RETURN(ERR_PTR(-ENOLCK));
+ }
}
flags = 0;
#define DENTRY_VALID(dentry) \
((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
+#define MDS_NO_SPLIT_EXPECTED 0
+#define MDS_EXPECT_SPLIT 1
+#define MDS_NO_SPLITTABLE 2
+
static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
{
return &req->rq_export->exp_obd->u.mds;
/* clustered MD ? */
if (!mds->mds_lmv_obd)
- RETURN(0);
+ RETURN(MDS_NO_SPLITTABLE);
/* inode exist? */
if (dentry->d_inode == NULL)
- return 0;
+ return MDS_NO_SPLITTABLE;
/* a dir can be splitted only */
if (!S_ISDIR(dentry->d_inode->i_mode))
- return 0;
-
- /* large enough to be splitted? */
- if (dentry->d_inode->i_size < MAX_DIR_SIZE)
- return 0;
+ return MDS_NO_SPLITTABLE;
/* don't split root directory */
if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
- return 0;
+ return MDS_NO_SPLITTABLE;
+
+ /* large enough to be splitted? */
+ if (dentry->d_inode->i_size < MAX_DIR_SIZE)
+ return MDS_NO_SPLIT_EXPECTED;
mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
if (mea) {
/* already splitted or slave object: shouldn't be splitted */
- rc = 0;
+ rc = MDS_NO_SPLITTABLE;
} else {
/* may be splitted */
- rc = 1;
+ rc = MDS_EXPECT_SPLIT;
}
if (mea)
ENTRY;
/* TODO: optimization possible - we already may have mea here */
- if (!mds_splitting_expected(obd, dentry))
- RETURN(0);
+ if (mds_splitting_expected(obd, dentry) != MDS_EXPECT_SPLIT)
+ return 0;
LASSERT(mea == NULL || *mea == NULL);
CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
ldlm_policy_data_t policy;
int flags = 0;
*update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
- res_id.name[0] = (*dparentp)->d_inode->i_ino;
- res_id.name[1] = (*dparentp)->d_inode->i_generation;
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
- LDLM_IBITS, &policy, *update_mode, &flags,
- mds_blocking_ast, ldlm_completion_ast,
- NULL, NULL, NULL, 0, NULL, parent_lockh+1);
- if (rc != ELDLM_OK)
- RETURN(-ENOLCK);
+ if (*update_mode) {
+ res_id.name[0] = (*dparentp)->d_inode->i_ino;
+ res_id.name[1] = (*dparentp)->d_inode->i_generation;
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS, &policy,
+ *update_mode, &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL,
+ parent_lockh + 1);
+ if (rc != ELDLM_OK)
+ RETURN(-ENOLCK);
+ }
parent_res_id.name[2] = full_name_hash(name, namelen - 1);
CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
if (IS_PDIROPS(de_tgt_dir->d_inode)) {
int flags = 0;
update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
- tgt_dir_res_id, LDLM_IBITS, &src_policy,
- update_mode, &flags, mds_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, tgt_dir_lockh + 1);
- if (rc != ELDLM_OK)
- GOTO(cleanup, rc = -ENOLCK);
+ if (update_mode) {
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ tgt_dir_res_id, LDLM_IBITS,
+ &src_policy, update_mode, &flags,
+ mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, tgt_dir_lockh + 1);
+ if (rc != ELDLM_OK)
+ GOTO(cleanup, rc = -ENOLCK);
+ }
tgt_dir_res_id.name[2] = full_name_hash(rec->ur_name,
rec->ur_namelen - 1);