Whamcloud - gitweb
b=2837
authoralex <alex>
Wed, 16 Jun 2004 07:40:59 +0000 (07:40 +0000)
committeralex <alex>
Wed, 16 Jun 2004 07:40:59 +0000 (07:40 +0000)
- minor optimization: MDS doesn't take UPDATE lock to protect
  from splitting if splitting isn't possible (slave objects)

lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lmv.c
lustre/mds/mds_reint.c

index 9236e3d..46f0ef3 100644 (file)
@@ -158,7 +158,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 int mds_lock_mode_for_dir(struct obd_device *obd,
                               struct dentry *dentry, int mode)
 {
-        int ret_mode;
+        int ret_mode, split;
 
         /* any dir access needs couple locks:
          * 1) on part of dir we gonna lookup/modify in
@@ -174,23 +174,32 @@ int mds_lock_mode_for_dir(struct obd_device *obd,
          *    for splitting then we need to protect it from any
          *    type of access (lookup/modify/split) - LCK_EX -bzzz */
 
-        if (mode == LCK_PR) {
-                ret_mode = LCK_CR;
-        } else if (mode == LCK_PW) {
-                /* caller gonna modify directory.we use concurrent
-                   write lock here to retract client's cache for readdir */
-                ret_mode = LCK_CW;
-                if (mds_splitting_expected(obd, dentry)) {
-                        /* splitting possible. serialize any access */
-                        CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n",
-                               obd->obd_name,
-                               (unsigned long) dentry->d_inode->i_ino,
-                               (unsigned long) dentry->d_inode->i_generation);
+        split = mds_splitting_expected(obd, dentry);
+        if (split == MDS_NO_SPLITTABLE) {
+                /* this inode won't be splitted. so we need not to protect from
+                 * just flush client's cache on modification */
+                ret_mode = 0;
+                if (mode == LCK_PW)
+                        ret_mode = LCK_CW;
+        } else {
+                if (mode == LCK_PR) {
+                        ret_mode = LCK_CR;
+                } else if (mode == LCK_PW) {
+                        /* caller gonna modify directory.we use concurrent
+                           write lock here to retract client's cache for readdir */
+                        ret_mode = LCK_CW;
+                        if (split == MDS_EXPECT_SPLIT) {
+                                /* splitting possible. serialize any access */
+                                CDEBUG(D_OTHER, "%s: gonna split %u/%u\n",
+                                       obd->obd_name,
+                                       (unsigned) dentry->d_inode->i_ino,
+                                       (unsigned) dentry->d_inode->i_generation);
+                                ret_mode = LCK_EX;
+                        }
+                } else {
+                        CWARN("unexpected lock mode %d\n", mode);
                         ret_mode = LCK_EX;
                 }
-        } else {
-                CWARN("unexpected lock mode %d\n", mode);
-                ret_mode = LCK_EX;
         }
         return ret_mode;
 }
@@ -221,15 +230,17 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                         { .l_inodebits = { MDS_INODELOCK_UPDATE } };
                 LASSERT(mode != NULL);
                 *mode = mds_lock_mode_for_dir(obd, de, lock_mode);
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                res_id, LDLM_IBITS,
-                                &cpolicy, *mode, &flags,
-                                mds_blocking_ast,
-                                ldlm_completion_ast, NULL, NULL,
-                                NULL, 0, NULL, lockh + 1);
-                if (rc != ELDLM_OK) {
-                        l_dput(de);
-                        RETURN(ERR_PTR(-ENOLCK));
+                if (*mode) {
+                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                              res_id, LDLM_IBITS,
+                                              &cpolicy, *mode, &flags,
+                                              mds_blocking_ast,
+                                              ldlm_completion_ast, NULL, NULL,
+                                              NULL, 0, NULL, lockh + 1);
+                        if (rc != ELDLM_OK) {
+                                l_dput(de);
+                                RETURN(ERR_PTR(-ENOLCK));
+                        }
                 }
                 flags = 0;
 
index d02429d..ce3d1b8 100644 (file)
@@ -17,6 +17,10 @@ struct mds_filter_data {
 #define DENTRY_VALID(dentry)    \
         ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
 
+#define MDS_NO_SPLIT_EXPECTED   0
+#define MDS_EXPECT_SPLIT        1
+#define MDS_NO_SPLITTABLE       2
+
 static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 {
         return &req->rq_export->exp_obd->u.mds;
index ccd4ec7..92975d8 100644 (file)
@@ -485,31 +485,31 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
 
        /* clustered MD ? */
        if (!mds->mds_lmv_obd)
-               RETURN(0);
+               RETURN(MDS_NO_SPLITTABLE);
 
         /* inode exist? */
         if (dentry->d_inode == NULL)
-                return 0;
+                return MDS_NO_SPLITTABLE;
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
-                return 0;
-
-        /* large enough to be splitted? */
-        if (dentry->d_inode->i_size < MAX_DIR_SIZE)
-                return 0;
+                return MDS_NO_SPLITTABLE;
 
         /* don't split root directory */
         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
-                return 0;
+                return MDS_NO_SPLITTABLE;
+
+        /* large enough to be splitted? */
+        if (dentry->d_inode->i_size < MAX_DIR_SIZE)
+                return MDS_NO_SPLIT_EXPECTED;
 
         mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
         if (mea) {
                 /* already splitted or slave object: shouldn't be splitted */
-                rc = 0;
+                rc = MDS_NO_SPLITTABLE;
         } else {
                 /* may be splitted */
-                rc = 1;
+                rc = MDS_EXPECT_SPLIT;
         }
 
         if (mea)
@@ -532,8 +532,8 @@ int mds_try_to_split_dir(struct obd_device *obd,
        ENTRY;
 
         /* TODO: optimization possible - we already may have mea here */
-        if (!mds_splitting_expected(obd, dentry))
-                RETURN(0);
+        if (mds_splitting_expected(obd, dentry) != MDS_EXPECT_SPLIT)
+                return 0;
         LASSERT(mea == NULL || *mea == NULL);
 
         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
index e713ae6..9e9ab01 100644 (file)
@@ -1221,15 +1221,20 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                 ldlm_policy_data_t policy;
                 int flags = 0;
                 *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
-                res_id.name[0] = (*dparentp)->d_inode->i_ino;
-                res_id.name[1] = (*dparentp)->d_inode->i_generation;
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
-                                      LDLM_IBITS, &policy, *update_mode, &flags,
-                                      mds_blocking_ast, ldlm_completion_ast,
-                                      NULL, NULL, NULL, 0, NULL, parent_lockh+1);
-                if (rc != ELDLM_OK)
-                        RETURN(-ENOLCK);
+                if (*update_mode) {
+                        res_id.name[0] = (*dparentp)->d_inode->i_ino;
+                        res_id.name[1] = (*dparentp)->d_inode->i_generation;
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                              res_id, LDLM_IBITS, &policy,
+                                              *update_mode, &flags,
+                                              mds_blocking_ast,
+                                              ldlm_completion_ast,
+                                              NULL, NULL, NULL, 0, NULL,
+                                              parent_lockh + 1);
+                        if (rc != ELDLM_OK)
+                                RETURN(-ENOLCK);
+                }
 
                 parent_res_id.name[2] = full_name_hash(name, namelen - 1);
                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
@@ -2059,13 +2064,16 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         if (IS_PDIROPS(de_tgt_dir->d_inode)) {
                 int flags = 0;
                 update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                      tgt_dir_res_id, LDLM_IBITS, &src_policy,
-                                      update_mode, &flags, mds_blocking_ast,
-                                      ldlm_completion_ast, NULL, NULL,
-                                      NULL, 0, NULL, tgt_dir_lockh + 1);
-                if (rc != ELDLM_OK)
-                        GOTO(cleanup, rc = -ENOLCK);
+                if (update_mode) {
+                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                              tgt_dir_res_id, LDLM_IBITS,
+                                              &src_policy, update_mode, &flags,
+                                              mds_blocking_ast,
+                                              ldlm_completion_ast, NULL, NULL,
+                                              NULL, 0, NULL, tgt_dir_lockh + 1);
+                        if (rc != ELDLM_OK)
+                                GOTO(cleanup, rc = -ENOLCK);
+                }
 
                 tgt_dir_res_id.name[2] = full_name_hash(rec->ur_name,
                                                         rec->ur_namelen - 1);