Whamcloud - gitweb
changes to protect directory splitting from concurrent modifies/lookups:
[fs/lustre-release.git] / lustre / mds / handler.c
index ec6cd9f..5ffc3f5 100644 (file)
@@ -155,10 +155,50 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         return rc;
 }
 
+int mds_lock_mode_for_dir(struct obd_device *obd,
+                              struct dentry *dentry, int mode)
+{
+        int ret_mode;
+
+        /* any dir access needs couple locks:
+         * 1) on part of dir we gonna lookup/modify in
+         * 2) on a whole dir to protect it from concurrent splitting
+         *    and to flush client's cache for readdir()
+         * so, for a given mode and dentry this routine decides what
+         * lock mode to use for lock #2:
+         * 1) if caller's gonna lookup in dir then we need to protect
+         *    dir from being splitted only - LCK_CR
+         * 2) if caller's gonna modify dir then we need to protect
+         *    dir from being splitted and to flush cache - LCK_CW
+         * 3) if caller's gonna modify dir and that dir seems ready
+         *    for splitting then we need to protect it from any
+         *    type of access (lookup/modify/split) - LCK_EX -bzzz */
+
+        if (mode == LCK_PR) {
+                ret_mode = LCK_CR;
+        } else if (mode == LCK_PW) {
+                /* caller gonna modify directory.we use concurrent
+                   write lock here to retract client's cache for readdir */
+                ret_mode = LCK_CW;
+                if (mds_splitting_expected(obd, dentry)) {
+                        /* splitting possible. serialize any access */
+                        CERROR("%s: gonna split %lu/%lu\n",
+                               obd->obd_name,
+                               (unsigned long) dentry->d_inode->i_ino,
+                               (unsigned long) dentry->d_inode->i_generation);
+                        ret_mode = LCK_EX;
+                }
+        } else {
+                CWARN("unexpected lock mode %d\n", mode);
+                ret_mode = LCK_EX;
+        }
+        return ret_mode;
+}
+
 /* only valid locked dentries or errors should be returned */
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
-                                     struct lustre_handle *lockh,
+                                     struct lustre_handle *lockh, int *mode,
                                      char *name, int namelen, __u64 lockpart)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -174,27 +214,27 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
 
         res_id.name[0] = de->d_inode->i_ino;
         res_id.name[1] = de->d_inode->i_generation;
-#ifdef S_PDIROPS
         lockh[1].cookie = 0;
+#ifdef S_PDIROPS
         if (name && IS_PDIROPS(de->d_inode)) {
                 ldlm_policy_data_t cpolicy =
                         { .l_inodebits = { MDS_INODELOCK_UPDATE } };
-                /* lock just dir { ino, generation } to flush client cache */
-                if (lock_mode == LCK_PW) {
-                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                              res_id, LDLM_IBITS,
-                                              &cpolicy, LCK_CW, &flags,
-                                              mds_blocking_ast,
-                                              ldlm_completion_ast, NULL, NULL,
-                                              NULL, 0, NULL, lockh + 1);
-                        if (rc != ELDLM_OK) {
-                                l_dput(de);
-                                RETURN(ERR_PTR(-ENOLCK));
-                        }
-                       flags = 0;
+                LASSERT(mode != NULL);
+                *mode = mds_lock_mode_for_dir(obd, de, lock_mode);
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                res_id, LDLM_IBITS,
+                                &cpolicy, *mode, &flags,
+                                mds_blocking_ast,
+                                ldlm_completion_ast, NULL, NULL,
+                                NULL, 0, NULL, lockh + 1);
+                if (rc != ELDLM_OK) {
+                        l_dput(de);
+                        RETURN(ERR_PTR(-ENOLCK));
                 }
+                flags = 0;
 
                 res_id.name[2] = full_name_hash(name, namelen);
+
                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
                        de->d_inode->i_ino, de->d_inode->i_generation,
                        res_id.name[2]);
@@ -763,7 +803,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         struct dentry *dparent = NULL, *dchild = NULL;
         struct lvfs_ucred uc;
         struct lustre_handle parent_lockh[2];
-        int namesize;
+        int namesize, update_mode;
         int rc = 0, cleanup_phase = 0, resent_req = 0;
         char *name;
         ENTRY;
@@ -811,7 +851,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                        obd->obd_name, (unsigned long) body->fid1.id,
                        (unsigned long) body->fid1.generation);
                 dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
-                                               parent_lockh, NULL, 0, child_part);
+                                               parent_lockh, &update_mode, 
+                                               NULL, 0, child_part);
                 if (IS_ERR(dchild)) {
                         CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
                         GOTO(cleanup, rc = PTR_ERR(dchild));
@@ -819,7 +860,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                 memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie)
-                        ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                        ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
                 cleanup_phase = 2;
                 goto fill_inode;
@@ -857,7 +898,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
                                                  parent_lockh, &dparent,
                                                  LCK_PR, MDS_INODELOCK_LOOKUP,
-                                                 name, namesize,
+                                                 &update_mode, name, namesize,
                                                  child_lockh, &dchild, LCK_PR,
                                                  child_part);
                 if (rc)
@@ -915,7 +956,7 @@ fill_inode:
 #ifdef S_PDIROPS
                                 if (parent_lockh[1].cookie != 0)
                                         ldlm_lock_decref(parent_lockh + 1,
-                                                        LCK_CW);
+                                                         update_mode);
 #endif
                         }
                         if (dparent)