Whamcloud - gitweb
- added last chunk of ldlm part of pdiros, enabled by default, use --disable-pdirops...
authoryury <yury>
Sun, 22 Oct 2006 18:43:45 +0000 (18:43 +0000)
committeryury <yury>
Sun, 22 Oct 2006 18:43:45 +0000 (18:43 +0000)
- fixes in mdt_is_subdir();
- added few asserts checking f_ver invariants.

21 files changed:
lustre/autoconf/lustre-core.m4
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/cmm/mdc_object.c
lustre/fid/fid_lib.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/include/md_object.h
lustre/mdc/mdc_locks.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/obdclass/lu_object.c
lustre/osd/osd_igif.c
lustre/osd/osd_igif.h
lustre/osd/osd_oi.c

index 88a2f5a..70618f4 100644 (file)
@@ -703,7 +703,7 @@ fi
 # whether to enable quota support
 #
 AC_DEFUN([LC_CONFIG_SPLIT],
-[AC_MSG_CHECKING([whether to disable split support])
+[AC_MSG_CHECKING([whether to enable split support])
 AC_ARG_ENABLE([split], 
        AC_HELP_STRING([--disable-split],
                        [disable split support]),
@@ -715,13 +715,31 @@ fi
 ])
  
 #
+# LC_CONFIG_PDIROPS
+#
+# whether to enable PDIROPS
+#
+AC_DEFUN([LC_CONFIG_PDIROPS],
+[
+AC_MSG_CHECKING([whether to enable PDIROPS])
+AC_ARG_ENABLE([pdirops], 
+       AC_HELP_STRING([--disable-pdirops],
+                       [disable PDIROPS]),
+       [],[enable_pdirops='yes'])
+AC_MSG_RESULT([$enable_pdirops])
+if test x$enable_pdirops != xno; then
+   AC_DEFINE(CONFIG_PDIROPS, 1, [enable PDIROPS])
+fi
+])
+
+#
 # LC_CONFIG_LDISKFS
 #
 # whether to enable various ldiskfs debugs
 #
 AC_DEFUN([LC_CONFIG_LDISKFS],
 [
-AC_MSG_CHECKING([whether to disable ldiskfs asserts])
+AC_MSG_CHECKING([whether to enable ldiskfs asserts])
 AC_ARG_ENABLE([ldiskfs_asserts], 
        AC_HELP_STRING([--disable-ldiskfs-asserts],
                        [disable ldiskfs asserts]),
index b142cc6..def7915 100644 (file)
@@ -151,9 +151,10 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
                      const char *name);
 
 int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
-                         struct md_attr *ma);
+                         struct md_attr *ma, int *split);
 
 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo);
+
 #endif
 
 #endif /* __KERNEL__ */
index 19f2aca..ad9818d 100644 (file)
@@ -379,13 +379,22 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env,
 #ifdef HAVE_SPLIT_SUPPORT
         {
                 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-
+                int rc, split;
+                
+                memset(ma, 0, sizeof(*ma));
+                
                 /* 
                  * Check only if we need protection from split. If not - mdt
                  * handles other cases.
                  */
-                if (lm == MDL_PW &&
-                    cmm_expect_splitting(env, mo, ma) == CMM_EXPECT_SPLIT)
+                rc = cmm_expect_splitting(env, mo, ma, &split);
+                if (rc) {
+                        CERROR("Can't check for possible split, error %d\n",
+                               rc);
+                        RETURN(MDL_MINMODE);
+                }
+                
+                if (lm == MDL_PW && split == CMM_EXPECT_SPLIT)
                         RETURN(MDL_EX);
         }
 #endif
index d5e032c..a7ad53f 100644 (file)
@@ -79,12 +79,12 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
                 /* Get LMV EA */
                 ma->ma_need = MA_LMV;
                 rc = mo_attr_get(env, mp, ma);
+                
                 /* Skip checking the slave dirs (mea_count is 0) */
                 if (rc == 0 && ma->ma_lmv->mea_count != 0) {
                         /* 
-                         * Get stripe by name to check the name
-                         * belongs to master dir, otherwise
-                         * return the -ERESTART
+                         * Get stripe by name to check the name belongs to
+                         * master dir, otherwise return the -ERESTART
                          */
                         stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
                 
@@ -98,42 +98,50 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
 }
 
 int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
-                         struct md_attr *ma)
+                         struct md_attr *ma, int *split)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lu_fid *fid = NULL;
-        int rc = CMM_EXPECT_SPLIT;
+        struct lu_fid root_fid;
+        int rc;
         ENTRY;
 
-        if (cmm->cmm_tgt_count == 0)
-                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+        /* 
+         * Check first most light things like tgt count and root fid. For some
+         * case this style should yeild better performance.
+         */
+        if (cmm->cmm_tgt_count == 0) {
+                *split = CMM_NO_SPLIT_EXPECTED;
+                RETURN(0);
+        }
 
-        ma->ma_need = MA_INODE | MA_LMV;
-        rc = mo_attr_get(env, mo, ma);
+        rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
+                                              &root_fid);
         if (rc)
-                GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+                RETURN(rc);
 
-        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
-                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+        if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
+                *split = CMM_NOT_SPLITTABLE;
+                RETURN(0);
+        }
 
-        if (ma->ma_lmv_size)
-                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-        
-        OBD_ALLOC_PTR(fid);
-        rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
+        /* MA_INODE is needed to check inode size. */
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mo, ma);
         if (rc)
-                GOTO(cleanup, rc);
-
-        rc = CMM_EXPECT_SPLIT;
-
-        if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo))))
-                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-
-        EXIT;
-cleanup:
-        if (fid)
-                OBD_FREE_PTR(fid);
-        return rc;
+                RETURN(rc);
+        
+        if (ma->ma_valid & MA_LMV) {
+                *split = CMM_NOT_SPLITTABLE;
+                RETURN(0);
+        }
+                
+        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
+                *split = CMM_NO_SPLIT_EXPECTED;
+                RETURN(0);
+        }
+        
+        *split = CMM_EXPECT_SPLIT;
+        RETURN(0);
 }
 
 #define cmm_md_size(stripes) \
@@ -496,17 +504,20 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
         struct lu_buf *buf;
-        int rc = 0;
+        int rc = 0, split;
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
         memset(ma, 0, sizeof(*ma));
 
         /* Step1: Checking whether the dir needs to be split. */
-        rc = cmm_expect_splitting(env, mo, ma);
-        if (rc != CMM_EXPECT_SPLIT)
+        rc = cmm_expect_splitting(env, mo, ma, &split);
+        if (rc)
+                GOTO(cleanup, rc);
+        
+        if (split != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
-
+        
         /*
          * Disable trans for splitting, since there will be so many trans in
          * this one ops, confilct with current recovery design.
index 7d3c706..80ab880 100644 (file)
@@ -487,6 +487,7 @@ static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "
                        DFID"\n", PFID(&body->fid1));
                 *sfid = body->fid1;
+                rc = -EREMOTE;
         }
         EXIT;
 out:
index ac81fd7..ac2d921 100644 (file)
@@ -73,6 +73,7 @@ void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src)
         CLASSERT(sizeof *src ==
                  sizeof fid_seq(src) +
                  sizeof fid_oid(src) + sizeof fid_ver(src));
+        LASSERT(fid_is_igif(src) || fid_ver(src) == 0);
         dst->f_seq = cpu_to_le64(fid_seq(src));
         dst->f_oid = cpu_to_le32(fid_oid(src));
         dst->f_ver = cpu_to_le32(fid_ver(src));
@@ -88,6 +89,7 @@ void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
         dst->f_seq = le64_to_cpu(fid_seq(src));
         dst->f_oid = le32_to_cpu(fid_oid(src));
         dst->f_ver = le32_to_cpu(fid_ver(src));
+        LASSERT(fid_is_igif(dst) || fid_ver(dst) == 0);
 }
 EXPORT_SYMBOL(fid_le_to_cpu);
 
@@ -98,6 +100,7 @@ void fid_cpu_to_be(struct lu_fid *dst, const struct lu_fid *src)
         CLASSERT(sizeof *src ==
                  sizeof fid_seq(src) +
                  sizeof fid_oid(src) + sizeof fid_ver(src));
+        LASSERT(fid_is_igif(src) || fid_ver(src) == 0);
         dst->f_seq = cpu_to_be64(fid_seq(src));
         dst->f_oid = cpu_to_be32(fid_oid(src));
         dst->f_ver = cpu_to_be32(fid_ver(src));
@@ -113,6 +116,7 @@ void fid_be_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
         dst->f_seq = be64_to_cpu(fid_seq(src));
         dst->f_oid = be32_to_cpu(fid_oid(src));
         dst->f_ver = be32_to_cpu(fid_ver(src));
+        LASSERT(fid_is_igif(dst) || fid_ver(dst) == 0);
 }
 EXPORT_SYMBOL(fid_be_to_cpu);
 #endif
@@ -162,43 +166,3 @@ void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src)
 }
 EXPORT_SYMBOL(range_be_to_cpu);
 #endif
-
-/* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
-int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
-             struct lustre_handle *lh, ldlm_mode_t mode,
-             ldlm_policy_data_t *policy,
-             struct ldlm_res_id *res_id)
-{
-        int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-        int rc;
-
-        LASSERT(ns != NULL);
-        LASSERT(lh != NULL);
-        LASSERT(f != NULL);
-
-        rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id),
-                                    LDLM_IBITS, policy, mode, &flags,
-                                    ldlm_blocking_ast, ldlm_completion_ast,
-                                    NULL, NULL, 0, NULL, lh);
-        return rc == ELDLM_OK ? 0 : -EIO;
-}
-EXPORT_SYMBOL(fid_lock);
-
-void fid_unlock(const struct lu_fid *f,
-                struct lustre_handle *lh, ldlm_mode_t mode)
-{
-        {
-                /* XXX: this is debug stuff, remove it later. */
-                struct ldlm_lock *lock = ldlm_handle2lock(lh);
-                if (!lock) {
-                        CERROR("Invalid lock handle "LPX64"\n",
-                               lh->cookie);
-                        LBUG();
-                }
-                LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
-                LDLM_LOCK_PUT(lock);
-        }
-        ldlm_lock_decref(lh, mode);
-}
-EXPORT_SYMBOL(fid_unlock);
-
index ddcbd80..923e085 100644 (file)
@@ -246,6 +246,11 @@ static inline int fid_is_zero(const struct lu_fid *fid)
         return fid_seq(fid) == 0 && fid_oid(fid) == 0;
 }
 
+static inline int fid_is_igif(const struct lu_fid *fid)
+{
+        return fid_seq(fid) == LUSTRE_ROOT_FID_SEQ;
+}
+
 #define DFID "[0x%16.16"LPF64"x/0x%8.8x:0x%8.8x]"
 
 #define PFID(fid)     \
@@ -259,9 +264,11 @@ extern void lustre_swab_lu_range(struct lu_range *range);
 static inline int lu_fid_eq(const struct lu_fid *f0,
                             const struct lu_fid *f1)
 {
-       /* check that there is no alignment padding */
+       /* Check that there is no alignment padding. */
        CLASSERT(sizeof *f0 ==
                  sizeof f0->f_seq + sizeof f0->f_oid + sizeof f0->f_ver);
+        LASSERT(fid_is_igif(f0) || fid_ver(f0) == 0);
+        LASSERT(fid_is_igif(f1) || fid_ver(f1) == 0);
        return memcmp(f0, f1, sizeof *f0) == 0;
 }
 
index febb8b9..ec666f8 100644 (file)
@@ -200,24 +200,28 @@ void fid_be_to_cpu(struct lu_fid *dst, const struct lu_fid *src);
 
 struct ldlm_namespace;
 
-int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
-             struct lustre_handle *lh, ldlm_mode_t mode,
-             ldlm_policy_data_t *policy,
-             struct ldlm_res_id *res_id);
-void fid_unlock(const struct lu_fid *f,
-                struct lustre_handle *lh, ldlm_mode_t mode);
-
 /*
  * Build (DLM) resource name from fid.
  */
 static inline struct ldlm_res_id *
-fid_build_res_name(const struct lu_fid *f,
-                   struct ldlm_res_id *name)
+fid_build_reg_res_name(const struct lu_fid *f,
+                       struct ldlm_res_id *name)
 {
         memset(name, 0, sizeof *name);
         name->name[0] = fid_seq(f);
         name->name[1] = fid_oid(f);
         name->name[2] = fid_ver(f);
+        name->name[3] = 0ull;
+        return name;
+}
+
+static inline struct ldlm_res_id *
+fid_build_pdo_res_name(const struct lu_fid *f,
+                       unsigned int hash,
+                       struct ldlm_res_id *name)
+{
+        fid_build_reg_res_name(f, name);
+        name->name[3] = hash;
         return name;
 }
 
index d907a9b..183096f 100644 (file)
@@ -93,26 +93,32 @@ struct md_capainfo *md_capainfo(const struct lu_env *env);
 
 /* metadata attributes */
 enum ma_valid {
-        MA_INODE    = (1 << 0),
-        MA_LOV      = (1 << 1),
-        MA_COOKIE   = (1 << 2),
-        MA_FLAGS    = (1 << 3),
-        MA_LMV      = (1 << 4),
-        MA_ACL_DEF  = (1 << 5)
+        MA_INODE     = (1 << 0),
+        MA_LOV       = (1 << 1),
+        MA_COOKIE    = (1 << 2),
+        MA_FLAGS     = (1 << 3),
+        MA_LMV       = (1 << 4),
+        MA_ACL_DEF   = (1 << 5)
 };
 
 typedef enum {
-        MDL_MINMODE = 0,
-        MDL_EX      = 1,
-        MDL_PW      = 2,
-        MDL_PR      = 4,
-        MDL_CW      = 8,
-        MDL_CR      = 16,
-        MDL_NL      = 32,
-        MDL_GROUP   = 64,
+        MDL_MINMODE  = 0,
+        MDL_EX       = 1,
+        MDL_PW       = 2,
+        MDL_PR       = 4,
+        MDL_CW       = 8,
+        MDL_CR       = 16,
+        MDL_NL       = 32,
+        MDL_GROUP    = 64,
         MDL_MAXMODE
 } mdl_mode_t;
 
+typedef enum {
+        MDT_NUL_LOCK = 0,
+        MDT_REG_LOCK = (1 << 0),
+        MDT_PDO_LOCK = (1 << 1)
+} mdl_type_t;
+
 struct md_attr {
         __u64                   ma_valid;
         __u64                   ma_need;
index 08e92c5..31346d0 100644 (file)
@@ -692,6 +692,13 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                                              LDLM_IBITS, &policy, mode, &lockh);
                 }
 
+                if (!rc) {
+                        mode = LCK_PW;
+                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                             LDLM_FL_BLOCK_GRANTED, &res_id,
+                                             LDLM_IBITS, &policy, mode, &lockh);
+                }
+
                 if (rc) {
                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
                                sizeof(lockh));
index c5c37ce..5486ebd 100644 (file)
@@ -1119,7 +1119,7 @@ static int mdd_create(const struct lu_env *env,
 #endif
 
         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
-                                     son, ma, handle);
+                                   son, ma, handle);
         mdd_write_unlock(env, son);
         if (rc)
                 /*
index 5c67bd9..4e6a2e7 100644 (file)
@@ -11,6 +11,7 @@
  *   Author: Mike Shaver <shaver@clusterfs.com>
  *   Author: Nikita Danilov <nikita@clusterfs.com>
  *   Author: Huang Hua <huanghua@clusterfs.com>
+ *   Author: Yury Umanets <umka@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
@@ -154,6 +155,133 @@ void mdt_set_disposition(struct mdt_thread_info *info,
                 rep->lock_policy_res1 |= flag;
 }
 
+#ifdef CONFIG_PDIROPS
+static mdl_mode_t mdt_mdl_lock_modes[] = {
+        [0] = MDL_MINMODE,
+        [1] = MDL_EX,
+        [2] = MDL_PW,
+        [3] = MDL_PR,
+        [4] = MDL_CW,
+        [5] = MDL_CR,
+        [6] = MDL_NL,
+        [7] = MDL_GROUP
+};
+
+static ldlm_mode_t mdt_ldlm_lock_modes[] = {
+        [0] = LCK_MINMODE,
+        [1] = LCK_EX,
+        [2] = LCK_PW,
+        [3] = LCK_PR,
+        [4] = LCK_CW,
+        [5] = LCK_CR,
+        [6] = LCK_NL,
+        [7] = LCK_GROUP
+};
+
+static inline mdl_mode_t mdt_ldlm_mode2mdl_mode(ldlm_mode_t mode)
+{
+        int idx = ffs((int)mode);
+        
+        LASSERT(idx >= 0);
+        LASSERT(IS_PO2(mode));
+        LASSERT(idx < ARRAY_SIZE(mdt_mdl_lock_modes));
+        return mdt_mdl_lock_modes[idx];
+}
+
+static inline ldlm_mode_t mdt_mdl_mode2ldlm_mode(mdl_mode_t mode)
+{
+        int idx = ffs((int)mode);
+        
+        LASSERT(idx >= 0);
+        LASSERT(IS_PO2(mode));
+        LASSERT(idx < ARRAY_SIZE(mdt_ldlm_lock_modes));
+        return mdt_ldlm_lock_modes[idx];
+}
+#endif
+
+void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
+{
+        lh->mlh_pdo_hash = 0;
+        lh->mlh_reg_mode = lm;
+        lh->mlh_type = MDT_REG_LOCK;
+}
+
+void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
+                       const char *name, int namelen)
+{
+        lh->mlh_reg_mode = lm;
+        lh->mlh_type = MDT_PDO_LOCK;
+        lh->mlh_pdo_hash = (name != NULL && namelen > 0 ?
+                            full_name_hash(name, namelen) : 0);
+}
+
+#ifdef CONFIG_PDIROPS
+static ldlm_mode_t mdt_lock_pdo_mode(struct mdt_thread_info *info,
+                                     struct mdt_object *o,
+                                     ldlm_mode_t lm)
+{
+        mdl_mode_t mode;
+
+        /*
+         * Any dir access needs couple of locks:
+         *
+         * 1) on part of dir we gonna take lookup/modify;
+         *
+         * 2) on whole dir to protect it from concurrent splitting and/or to
+         * flush client's cache for readdir().
+         *
+         * so, for a given mode and object this routine decides what lock mode
+         * to use for lock #2:
+         *
+         * 1) if caller's gonna lookup in dir then we need to protect dir from
+         * being splitted only - LCK_CR
+         *
+         * 2) if caller's gonna modify dir then we need to protect dir from
+         * being splitted and to flush cache - LCK_CW
+         *
+         * 3) if caller's gonna modify dir and that dir seems ready for
+         * splitting then we need to protect it from any type of access
+         * (lookup/modify/split) - LCK_EX --bzzz
+         */
+
+        LASSERT(lm != LCK_MINMODE);
+        
+        if (mdt_object_exists(o) > 0) {
+                /*
+                 * Ask underlaying level its opinion about possible locks.
+                 */
+                mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
+                                     mdt_ldlm_mode2mdl_mode(lm));
+        } else {
+                /* Default locks for non-existing objects. */
+                mode = MDL_MINMODE;
+        }
+                
+        if (mode != MDL_MINMODE) {
+                /* Lower layer said what lock mode it likes to be, use it. */
+                return mdt_mdl_mode2ldlm_mode(mode);
+        } else {
+                /* 
+                 * Lower layer does not want to specify locking mode. We od it
+                 * our selves. No special protection is needed, just flush
+                 * client's cache on modification.
+                 */
+                if (lm == LCK_EX) {
+                        return LCK_EX;
+                } else if (lm == LCK_PR) {
+                        return LCK_CR;
+                } else if (lm == LCK_PW) {
+                        return LCK_CW;
+                } else {
+                        CWARN("Not expected lock type (0x%x)\n",
+                              (int)mode);
+                }
+        }
+        
+        return LCK_MINMODE;
+}
+#endif
+
 static int mdt_getstatus(struct mdt_thread_info *info)
 {
         struct mdt_device *mdt  = info->mti_mdt;
@@ -553,7 +681,7 @@ static int mdt_is_subdir(struct mdt_thread_info *info)
          * Save error code to ->mode. Later it it is used for detecting the case
          * of remote subdir.
          */
-        repbody->mode = rc;
+        repbody->mode = rc < 0 ? -rc : rc;
         repbody->valid = OBD_MD_FLMODE;
 
         if (rc == -EREMOTE)
@@ -609,7 +737,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct mdt_object     *child;
         struct md_object      *next = mdt_object_child(info->mti_object);
         struct lu_fid         *child_fid = &info->mti_tmp_fid1;
-        int                    is_resent, rc;
+        int                    is_resent, rc, namelen = 0;
         const char            *name;
         struct mdt_lock_handle *lhp;
         struct ldlm_lock      *lock;
@@ -624,6 +752,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         if (name == NULL)
                 RETURN(err_serious(-EFAULT));
 
+        namelen = req_capsule_get_size(&info->mti_pill, &RMF_NAME,
+                                       RCL_CLIENT);
+
         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
 
@@ -666,7 +797,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         rc = 0;
                 } else {
                         mdt_lock_handle_init(lhc);
-                        lhc->mlh_reg_mode = LCK_CR;
+                        mdt_lock_reg_init(lhc, MDT_RD_LOCK);
 
                         /*
                          * Object's name is on another MDS, no lookup lock is
@@ -674,7 +805,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                          */
                         child_bits &= ~MDS_INODELOCK_LOOKUP;
                         child_bits |= MDS_INODELOCK_UPDATE;
-                        rc = mdt_object_lock(info, child, lhc, child_bits);
+                        
+                        rc = mdt_object_lock(info, child, lhc, child_bits,
+                                             MDT_LOCAL_LOCK);
                 }
                 if (rc == 0) {
                         /* Finally, we can get attr for child. */
@@ -689,8 +822,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
         /*step 1: lock parent */
         lhp = &info->mti_lh[MDT_LH_PARENT];
-        lhp->mlh_reg_mode = LCK_CR;
-        rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
+        mdt_lock_pdo_init(lhp, MDT_RD_LOCK, name, namelen);
+        rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+                             MDT_LOCAL_LOCK);
         if (rc != 0)
                 RETURN(rc);
 
@@ -722,8 +856,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 LDLM_LOCK_PUT(lock);
         } else {
                 mdt_lock_handle_init(lhc);
-                lhc->mlh_reg_mode = LCK_CR;
-                rc = mdt_object_cr_lock(info, child, lhc, child_bits);
+                mdt_lock_reg_init(lhc, MDT_RD_LOCK);
+
+                rc = mdt_object_lock(info, child, lhc, child_bits,
+                                     MDT_CROSS_LOCK);
                 if (rc != 0)
                         GOTO(out_child, rc);
         }
@@ -1411,141 +1547,66 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
         RETURN(m);
 }
 
-static mdl_mode_t mdt_mdl_lock_modes[] = {
-        [0] = MDL_MINMODE,
-        [1] = MDL_EX,
-        [2] = MDL_PW,
-        [3] = MDL_PR,
-        [4] = MDL_CW,
-        [5] = MDL_CR,
-        [6] = MDL_NL,
-        [7] = MDL_GROUP
-};
-
-static ldlm_mode_t mdt_ldlm_lock_modes[] = {
-        [0] = LCK_MINMODE,
-        [1] = LCK_EX,
-        [2] = LCK_PW,
-        [3] = LCK_PR,
-        [4] = LCK_CW,
-        [5] = LCK_CR,
-        [6] = LCK_NL,
-        [7] = LCK_GROUP
-};
-
-static inline mdl_mode_t mdt_ldlm_mode2mdl_mode(ldlm_mode_t mode)
-{
-        int idx = ffs((int)mode) - 1;
-        LASSERT(idx >= 0);
-        LASSERT(IS_PO2(mode));
-        LASSERT(idx < ARRAY_SIZE(mdt_mdl_lock_modes));
-        return mdt_mdl_lock_modes[idx];
-}
-
-static inline ldlm_mode_t mdt_mdl_mode2ldlm_mode(mdl_mode_t mode)
-{
-        int idx = ffs((int)mode) - 1;
-        LASSERT(idx >= 0);
-        LASSERT(IS_PO2(mode));
-        LASSERT(idx < ARRAY_SIZE(mdt_ldlm_lock_modes));
-        return mdt_ldlm_lock_modes[idx];
-}
-
-int mdt_lock_init_mode(struct mdt_thread_info *info, struct mdt_object *o,
-                       struct mdt_lock_handle *lh, ldlm_mode_t lm)
-{
-        ENTRY;
-
-        lh->mlh_reg_mode = lm;
-        
-#ifdef CONFIG_PDIROPS
-        {
-                mdl_mode_t mode;
-                
-                /*
-                 * Any dir access needs couple of locks:
-                 *
-                 * 1) on part of dir we gonna take lookup/modify;
-                 *
-                 * 2) on whole dir to protect it from concurrent splitting
-                 * and/or to flush client's cache for readdir().
-                 *
-                 * so, for a given mode and object this routine decides what
-                 * lock mode to use for lock #2:
-                 *
-                 * 1) if caller's gonna lookup in dir then we need to protect
-                 * dir from being splitted only - LCK_CR
-                 *
-                 * 2) if caller's gonna modify dir then we need to protect dir
-                 * from being splitted and to flush cache - LCK_CW
-                 *
-                 * 3) if caller's gonna modify dir and that dir seems ready for
-                 * splitting then we need to protect it from any type of access
-                 * (lookup/modify/split) - LCK_EX  --bzzz
-                 */
-
-                /* Ask underlaying level its opinion about possible locks. */
-                mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
-                                     mdt_ldlm_mode2mdl_mode(lm));
-                if (mode != MDL_MINMODE) {
-                        /* Lower layer said what lock mode it likes to be, use it. */
-                        lh->mlh_pdo_mode = mdt_mdl_mode2ldlm_mode(mode);
-                } else {
-                        /* 
-                         * Lower layer does not want to specify locking mode. We od it
-                         * our selves. No special protection is needed, just flush
-                         * client's cache on modification.
-                         */
-                        if (lm == LCK_EX) {
-                                lh->mlh_pdo_mode = LCK_EX;
-                        } else if (lm == LCK_PR) {
-                                lh->mlh_pdo_mode = LCK_CR;
-                        } else if (lm == LCK_PW) {
-                                lh->mlh_pdo_mode = LCK_CW;
-                        } else {
-                                CWARN("Not expected lock type (0x%x)\n", (int)lm);
-                                lh->mlh_pdo_mode = LCK_MINMODE;
-                        }
-                }
-        }
-#endif
-
-        RETURN(0);
-}
-
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                    struct mdt_lock_handle *lh, __u64 ibits)
+                    struct mdt_lock_handle *lh, __u64 ibits, int locality)
 {
+        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
         ldlm_policy_data_t *policy = &info->mti_policy;
         struct ldlm_res_id *res_id = &info->mti_res_id;
-        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
         int rc;
         ENTRY;
 
         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+
         if (mdt_object_exists(o) < 0) {
-                LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
-                LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+                if (locality == MDT_CROSS_LOCK) {
+                        /* cross-ref object fix */
+                        ibits &= ~MDS_INODELOCK_UPDATE;
+                        ibits |= MDS_INODELOCK_LOOKUP;
+                } else {
+                        LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
+                        LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+                }
         }
-        memset(policy, 0, sizeof *policy);
-        policy->l_inodebits.bits = ibits;
 
-        rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_reg_lh,
-                      lh->mlh_reg_mode, policy, res_id);
-        RETURN(rc);
-}
+        memset(policy, 0, sizeof *policy);
+        fid_build_reg_res_name(mdt_object_fid(o), res_id);
+        
+#ifdef CONFIG_PDIROPS
+        /* 
+         * Take PDO lock on whole directory and build correct @res_id for lock
+         * on part of directrory.
+         */
+        if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) {
+                lh->mlh_pdo_mode = mdt_lock_pdo_mode(info, o, lh->mlh_reg_mode);
+                if (lh->mlh_pdo_mode != LCK_MINMODE) {
+                        policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                        rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
+                                          policy, res_id, LDLM_FL_ATOMIC_CB);
+                        if (rc)
+                                RETURN(rc);
+                }
 
-/* lock with cross-ref fixes */
-int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                       struct mdt_lock_handle *lh, __u64 ibits)
-{
-        if (mdt_object_exists(o) < 0) {
-                /* cross-ref object fix */
-                ibits &= ~MDS_INODELOCK_UPDATE;
-                ibits |= MDS_INODELOCK_LOOKUP;
+                fid_build_pdo_res_name(mdt_object_fid(o), lh->mlh_pdo_hash,
+                                       res_id);
         }
-        return mdt_object_lock(info, o, lh, ibits);
+#endif
+        
+        policy->l_inodebits.bits = ibits;
+        rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
+                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
+#ifdef CONFIG_PDIROPS
+        if (rc) {
+                if (lh->mlh_type == MDT_PDO_LOCK) {
+                        mdt_fid_unlock(&lh->mlh_pdo_lh, lh->mlh_pdo_mode);
+                        lh->mlh_pdo_lh.cookie = 0ull;
+                }
+        }
+#endif
+        
+        RETURN(rc);
 }
 
 /*
@@ -1556,17 +1617,25 @@ int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                        struct mdt_lock_handle *lh, int decref)
 {
-        struct ptlrpc_request *req    = mdt_info_req(info);
-        struct lustre_handle  *handle = &lh->mlh_reg_lh;
-        ldlm_mode_t            mode   = lh->mlh_reg_mode;
+        struct ptlrpc_request *req = mdt_info_req(info);
         ENTRY;
 
-        if (lustre_handle_is_used(handle)) {
-                if (decref)
-                        fid_unlock(mdt_object_fid(o), handle, mode);
-                else
-                        ptlrpc_save_lock(req, handle, mode);
-                handle->cookie = 0;
+        /* Do not save PDO locks to request. */
+        if (lustre_handle_is_used(&lh->mlh_pdo_lh)) {
+                mdt_fid_unlock(&lh->mlh_pdo_lh,
+                               lh->mlh_pdo_mode);
+                lh->mlh_pdo_lh.cookie = 0;
+        }
+        
+        if (lustre_handle_is_used(&lh->mlh_reg_lh)) {
+                if (decref) {
+                        mdt_fid_unlock(&lh->mlh_reg_lh,
+                                       lh->mlh_reg_mode);
+                } else {
+                        ptlrpc_save_lock(req, &lh->mlh_reg_lh,
+                                         lh->mlh_reg_mode);
+                }
+                lh->mlh_reg_lh.cookie = 0;
         }
         EXIT;
 }
@@ -1582,7 +1651,8 @@ struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
         if (!IS_ERR(o)) {
                 int rc;
 
-                rc = mdt_object_lock(info, o, lh, ibits);
+                rc = mdt_object_lock(info, o, lh, ibits,
+                                     MDT_LOCAL_LOCK);
                 if (rc != 0) {
                         mdt_object_put(info->mti_env, o);
                         o = ERR_PTR(rc);
@@ -1851,6 +1921,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
 {
+        lh->mlh_type = MDT_PDO_LOCK;
         lh->mlh_reg_lh.cookie = 0ull;
         lh->mlh_reg_mode = LCK_MINMODE;
         lh->mlh_pdo_lh.cookie = 0ull;
@@ -1860,6 +1931,7 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh)
 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
 {
         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
 }
 
 /*
@@ -4212,7 +4284,7 @@ static int __init mdt_mod_init(void)
         int rc;
 
         printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
-
+        
         mdt_num_threads = MDT_NUM_THREADS;
         lprocfs_init_vars(mdt, &lvars);
         rc = class_register_type(&mdt_obd_device_ops, NULL,
index 495b886..fcec96c 100644 (file)
@@ -205,6 +205,9 @@ struct mdt_object {
 };
 
 struct mdt_lock_handle {
+        /* Lock type, reg for cross-ref use or pdo lock. */
+        mdl_type_t              mlh_type;
+        
         /* Regular lock */
         struct lustre_handle    mlh_reg_lh;
         ldlm_mode_t             mlh_reg_mode;
@@ -212,6 +215,7 @@ struct mdt_lock_handle {
         /* Pdirops lock */
         struct lustre_handle    mlh_pdo_lh;
         ldlm_mode_t             mlh_pdo_mode;
+        unsigned int            mlh_pdo_hash;
 };
 
 enum {
@@ -223,14 +227,21 @@ enum {
         MDT_LH_NR
 };
 
+enum {
+        MDT_LOCAL_LOCK,
+        MDT_CROSS_LOCK
+};
+
 struct mdt_reint_record {
         mdt_reint_t          rr_opcode;
         const struct lu_fid *rr_fid1;
         const struct lu_fid *rr_fid2;
         const char          *rr_name;
+        int                  rr_namelen;
         const char          *rr_tgt;
-        int                  rr_eadatalen;
+        int                  rr_tgtlen;
         const void          *rr_eadata;
+        int                  rr_eadatalen;
         int                  rr_logcookielen;
         const struct llog_cookie  *rr_logcookies;
         __u32                rr_flags;
@@ -429,15 +440,21 @@ void mdt_set_disposition(struct mdt_thread_info *info,
 void mdt_clear_disposition(struct mdt_thread_info *info,
                         struct ldlm_reply *rep, int flag);
 
+void mdt_lock_pdo_init(struct mdt_lock_handle *lh,
+                       ldlm_mode_t lm, const char *name,
+                       int namelen);
+
+void mdt_lock_reg_init(struct mdt_lock_handle *lh,
+                       ldlm_mode_t lm);
+
+int mdt_lock_setup(struct mdt_thread_info *info,
+                   struct mdt_object *o,
+                   struct mdt_lock_handle *lh);
+
 int mdt_object_lock(struct mdt_thread_info *,
                     struct mdt_object *,
                     struct mdt_lock_handle *,
-                    __u64);
-
-int mdt_object_cr_lock(struct mdt_thread_info *,
-                       struct mdt_object *,
-                       struct mdt_lock_handle *,
-                       __u64);
+                    __u64, int);
 
 void mdt_object_unlock(struct mdt_thread_info *,
                        struct mdt_object *,
@@ -450,7 +467,7 @@ struct mdt_object *mdt_object_find(const struct lu_env *,
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
                                         const struct lu_fid *,
                                         struct mdt_lock_handle *,
-                                        __u64 ibits);
+                                        __u64);
 void mdt_object_unlock_put(struct mdt_thread_info *,
                            struct mdt_object *,
                            struct mdt_lock_handle *,
@@ -640,6 +657,32 @@ static inline int is_identity_get_disabled(struct upcall_cache *cache)
         return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
 }
 
+/* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
+static inline int mdt_fid_lock(struct ldlm_namespace *ns,
+                               struct lustre_handle *lh,
+                               ldlm_mode_t mode,
+                               ldlm_policy_data_t *policy,
+                               struct ldlm_res_id *res_id,
+                               int flags)
+{
+        int rc;
+
+        LASSERT(ns != NULL);
+        LASSERT(lh != NULL);
+
+        rc = ldlm_cli_enqueue_local(ns, *res_id, LDLM_IBITS, policy,
+                                    mode, &flags, ldlm_blocking_ast,
+                                    ldlm_completion_ast, NULL, NULL,
+                                    0, NULL, lh);
+        return rc == ELDLM_OK ? 0 : -EIO;
+}
+
+static inline void mdt_fid_unlock(struct lustre_handle *lh,
+                                  ldlm_mode_t mode)
+{
+        ldlm_lock_decref(lh, mode);
+}
+
 /*
  * Capability
  */
@@ -663,5 +706,16 @@ static inline void mdt_set_capainfo(struct mdt_thread_info *info, int offset,
         ci->mc_fid[offset]  = fid;
         ci->mc_capa[offset] = capa;
 }
+
+#ifdef CONFIG_PDIROPS
+#define MDT_RD_LOCK LCK_PR
+#define MDT_WR_LOCK LCK_PW
+#define MDT_EX_LOCK LCK_EX
+#else
+#define MDT_RD_LOCK LCK_CR
+#define MDT_WR_LOCK LCK_EX
+#define MDT_EX_LOCK LCK_EX
+#endif
+
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index b907ba5..2ec8416 100644 (file)
@@ -741,6 +741,8 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
         mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+        rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+        
 #ifdef CONFIG_FS_POSIX_ACL
         if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) {
                 if (S_ISDIR(attr->la_mode))
@@ -822,6 +824,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
+        rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
 
         RETURN(0);
 }
@@ -861,6 +864,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
+        rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
 
         RETURN(0);
 }
@@ -905,6 +909,8 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
         rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
         if (rr->rr_name == NULL || rr->rr_tgt == NULL)
                 RETURN(-EFAULT);
+        rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+        rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT);
 
         RETURN(0);
 }
@@ -955,6 +961,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
+        rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
 
         if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
                 struct md_create_spec *sp = &info->mti_spec;
index 186ffeb..f6772b7 100644 (file)
@@ -149,8 +149,9 @@ int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
          * In the later case, mdt_reint_setattr will do it. */
         if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
                 struct mdt_lock_handle  *lh = &info->mti_lh[MDT_LH_CHILD];
-                lh->mlh_reg_mode = LCK_EX;
-                rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE);
+                mdt_lock_reg_init(lh, MDT_EX_LOCK);
+                rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE,
+                                     MDT_LOCAL_LOCK);
                 if (rc == 0)
                         mdt_object_unlock(info, o, lh, 1);
         }
@@ -782,10 +783,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         }
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        if (!(create_flags & MDS_OPEN_CREAT))
-                lh->mlh_reg_mode = LCK_CR;
-        else
-                lh->mlh_reg_mode = LCK_EX;
+        if (!(create_flags & MDS_OPEN_CREAT)) {
+                mdt_lock_pdo_init(lh, MDT_RD_LOCK, rr->rr_name,
+                                  rr->rr_namelen);
+        } else {
+                mdt_lock_pdo_init(lh, MDT_WR_LOCK, rr->rr_name,
+                                  rr->rr_namelen);
+        }
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
@@ -886,10 +890,11 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                 rc = 0;
                         } else {
                                 mdt_lock_handle_init(lhc);
-                                lhc->mlh_reg_mode = LCK_CR;
+                                mdt_lock_reg_init(lhc, MDT_RD_LOCK);
 
                                 rc = mdt_object_lock(info, child, lhc,
-                                                     MDS_INODELOCK_LOOKUP);
+                                                     MDS_INODELOCK_LOOKUP,
+                                                     MDT_LOCAL_LOCK);
                         }
                         repbody->fid1 = *mdt_object_fid(child);
                         repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
index abe6158..3fb783d 100644 (file)
 
 static int mdt_md_create(struct mdt_thread_info *info)
 {
-        struct mdt_device      *mdt = info->mti_mdt;
-        struct mdt_object      *parent;
-        struct mdt_object      *child;
-        struct mdt_lock_handle *lh;
-        struct mdt_body        *repbody;
-        struct md_attr         *ma = &info->mti_attr;
+        struct mdt_device       *mdt = info->mti_mdt;
+        struct mdt_object       *parent;
+        struct mdt_object       *child;
+        struct mdt_lock_handle  *lh;
+        struct mdt_body         *repbody;
+        struct md_attr          *ma = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
         int rc;
         ENTRY;
@@ -54,7 +54,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(lh, MDT_WR_LOCK, rr->rr_name, rr->rr_namelen);
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
@@ -160,14 +160,14 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
                 RETURN(0);
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(lh, MDT_WR_LOCK, NULL, 0);
 
         if (!(flags & MRF_SETATTR_LOCKED)) {
                 __u64 lockpart = MDS_INODELOCK_UPDATE;
                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
                         lockpart |= MDS_INODELOCK_LOOKUP;
 
-                rc = mdt_object_lock(info, mo, lh, lockpart);
+                rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
                 if (rc != 0)
                         GOTO(out, rc);
         }
@@ -334,8 +334,8 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         case S_IFBLK:
         case S_IFIFO:
         case S_IFSOCK:{
-                /* special file should stay on the same node as parent */
-                LASSERT(strlen(info->mti_rr.rr_name) > 0);
+                /* Special file should stay on the same node as parent. */
+                LASSERT(info->mti_rr.rr_namelen > 0);
                 rc = mdt_md_create(info);
                 break;
         }
@@ -367,7 +367,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         /* step 1: lock the parent */
         parent_lh = &info->mti_lh[MDT_LH_PARENT];
-        parent_lh->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(parent_lh, MDT_WR_LOCK, rr->rr_name,
+                          rr->rr_namelen);
+        
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
                                   MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
@@ -424,8 +426,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
-        child_lh->mlh_reg_mode = LCK_EX;
-        rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
+        mdt_lock_reg_init(child_lh, MDT_EX_LOCK);
+        rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
+                             MDT_CROSS_LOCK);
         if (rc != 0)
                 GOTO(out_put_child, rc);
 
@@ -480,7 +483,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (rr->rr_name[0] == 0) {
                 /* MDT holding name ask us to add ref. */
                 lhs = &info->mti_lh[MDT_LH_CHILD];
-                lhs->mlh_reg_mode = LCK_EX;
+                mdt_lock_reg_init(lhs, MDT_EX_LOCK);
                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
                                           MDS_INODELOCK_UPDATE);
                 if (IS_ERR(ms))
@@ -494,7 +497,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
 
         /* step 1: find & lock the target parent dir */
         lhp = &info->mti_lh[MDT_LH_PARENT];
-        lhp->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(lhp, MDT_WR_LOCK, rr->rr_name,
+                          rr->rr_namelen);
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
                                   MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
@@ -502,12 +506,13 @@ static int mdt_reint_link(struct mdt_thread_info *info,
 
         /* step 2: find & lock the source */
         lhs = &info->mti_lh[MDT_LH_CHILD];
-        lhs->mlh_reg_mode = LCK_EX;
+        mdt_lock_reg_init(lhs, MDT_EX_LOCK);
         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(ms))
                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
 
-        rc = mdt_object_cr_lock(info, ms, lhs, MDS_INODELOCK_UPDATE);
+        rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
+                             MDT_CROSS_LOCK);
         if (rc != 0)
                 GOTO(out_unlock_source, rc);
 
@@ -545,7 +550,8 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
 
         /* step 1: lookup & lock the tgt dir */
         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
-        lh_tgtdir->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(lh_tgtdir, MDT_WR_LOCK, rr->rr_tgt,
+                          rr->rr_tgtlen);
         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
@@ -558,13 +564,15 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         if (rc != 0 && rc != -ENOENT) {
                 GOTO(out_unlock_tgtdir, rc);
         } else if (rc == 0) {
-                /* in case of replay that name can be already inserted,
-                 * check that and do nothing if so */
+                /*
+                 * In case of replay that name can be already inserted, check
+                 * that and do nothing if so.
+                 */
                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
                         GOTO(out_unlock_tgtdir, rc);
 
                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
-                lh_tgt->mlh_reg_mode = LCK_EX;
+                mdt_lock_reg_init(lh_tgt, MDT_EX_LOCK);
 
                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
                                             MDS_INODELOCK_LOOKUP);
@@ -584,10 +592,9 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         if (rc == 0 && mtgt)
                 mdt_handle_last_unlink(info, mtgt, ma);
 
-        EXIT;
-        if (mtgt) {
+        if (mtgt != NULL)
                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
-        }
+        EXIT;
 out_unlock_tgtdir:
         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
 out:
@@ -607,7 +614,7 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
         ENTRY;
 
         ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
-        fid_build_res_name(&LUSTRE_BFL_FID, &res_id);
+        fid_build_reg_res_name(&LUSTRE_BFL_FID, &res_id);
 
         if (ls->ls_control_exp == NULL) {
                 /*
@@ -640,11 +647,11 @@ static void mdt_rename_unlock(struct lustre_handle *lh)
 }
 
 /*
- * This is is_subdir() variant, it is CMD is cmm forwards it to correct
+ * This is is_subdir() variant, it is CMD if cmm forwards it to correct
  * target. Source should not be ancestor of target dir. May be other rename
  * checks can be moved here later.
  */
-static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
+static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
 {
         struct mdt_reint_record *rr = &info->mti_rr;
         struct lu_fid dst_fid = *rr->rr_fid2;
@@ -711,17 +718,19 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         lh_newp = &info->mti_lh[MDT_LH_NEW];
 
-        /* step 1: lock the source dir */
+        /* step 1: lock the source dir. */
         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
-        lh_srcdirp->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(lh_srcdirp, MDT_WR_LOCK, rr->rr_name,
+                          rr->rr_namelen);
         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
-        /*step 2: find & lock the target dir*/
+        /* step 2: find & lock the target dir. */
         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
-        lh_tgtdirp->mlh_reg_mode = LCK_EX;
+        mdt_lock_pdo_init(lh_tgtdirp, MDT_WR_LOCK, rr->rr_tgt,
+                          rr->rr_tgtlen);
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
                 mdt_object_get(info->mti_env, msrcdir);
                 mtgtdir = msrcdir;
@@ -735,15 +744,16 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (rc == 0)
                         GOTO(out_unlock_target, rc = -ESTALE);
                 else if (rc > 0) {
-                        /* we lock the target dir iff it is local */
+                        /* we lock the target dir if it is local */
                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
-                                             MDS_INODELOCK_UPDATE);
+                                             MDS_INODELOCK_UPDATE,
+                                             MDT_LOCAL_LOCK);
                         if (rc != 0)
                                 GOTO(out_unlock_target, rc);
                 }
         }
 
-        /*step 3: find & lock the old object*/
+        /* step 3: find & lock the old object. */
         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
                         rr->rr_name, old_fid);
         if (rc != 0)
@@ -753,18 +763,18 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 GOTO(out_unlock_target, rc = -EINVAL);
 
         lh_oldp = &info->mti_lh[MDT_LH_OLD];
-        lh_oldp->mlh_reg_mode = LCK_EX;
+        mdt_lock_reg_init(lh_oldp, MDT_EX_LOCK);
         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
                                     MDS_INODELOCK_LOOKUP);
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
-        /*step 4: find & lock the new object*/
+        /* step 4: find & lock the new object. */
         /* new target object may not exist now */
         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
                         rr->rr_tgt, new_fid);
         if (rc == 0) {
-                /* the new_fid should have been filled at this moment*/
+                /* the new_fid should have been filled at this moment */
                 if (lu_fid_eq(old_fid, new_fid))
                        GOTO(out_unlock_old, rc);
 
@@ -772,13 +782,13 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                     lu_fid_eq(new_fid, rr->rr_fid2))
                         GOTO(out_unlock_old, rc = -EINVAL);
 
-                lh_newp->mlh_reg_mode = LCK_EX;
+                mdt_lock_reg_init(lh_newp, MDT_EX_LOCK);
                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
 
-                rc = mdt_object_cr_lock(info, mnew, lh_newp,
-                                        MDS_INODELOCK_FULL);
+                rc = mdt_object_lock(info, mnew, lh_newp,
+                                     MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
                 if (rc != 0) {
                         mdt_object_put(info->mti_env, mnew);
                         GOTO(out_unlock_old, rc);
@@ -806,8 +816,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
         mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
+        
         /* Check if @dst is subdir of @src. */
-        rc = mdt_rename_check(info, old_fid);
+        rc = mdt_rename_sanity(info, old_fid);
         if (rc)
                 GOTO(out_unlock_new, rc);
 
index 81c579e..8e899cb 100644 (file)
@@ -318,8 +318,8 @@ int mdt_setxattr(struct mdt_thread_info *info)
                 lockpart |= MDS_INODELOCK_LOOKUP;
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_reg_mode = LCK_EX;
-        rc = mdt_object_lock(info, obj, lh, lockpart);
+        mdt_lock_pdo_init(lh, MDT_WR_LOCK, NULL, 0);
+        rc = mdt_object_lock(info, obj, lh, lockpart, MDT_LOCAL_LOCK);
         if (rc != 0)
                 GOTO(out, rc);
 
index d06c38d..eff3744 100644 (file)
@@ -130,6 +130,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
          * This is the only place where object fid is assigned. It's constant
          * after this point.
          */
+        LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
         top->lo_header->loh_fid  = *f;
         layers = &top->lo_header->loh_layers;
         do {
index b15f626..9bf9870 100644 (file)
 #include "osd_oi.h"
 #include "osd_igif.h"
 
-int lu_fid_is_igif(const struct lu_fid *fid)
-{
-        return fid_seq(fid) == LUSTRE_ROOT_FID_SEQ;
-}
-
 void lu_igif_to_id(const struct lu_fid *fid, struct osd_inode_id *id)
 {
-        LASSERT(lu_fid_is_igif(fid));
+        LASSERT(fid_is_igif(fid));
         id->oii_ino = lu_igif_ino(fid);
         id->oii_gen = lu_igif_gen(fid);
 }
 
 __u32 lu_igif_ino(const struct lu_fid *fid)
 {
-        LASSERT(lu_fid_is_igif(fid));
+        LASSERT(fid_is_igif(fid));
         return fid_oid(fid);
 }
 
 __u32 lu_igif_gen(const struct lu_fid *fid)
 {
-        LASSERT(lu_fid_is_igif(fid));
+        LASSERT(fid_is_igif(fid));
         return fid_ver(fid);
 }
 
@@ -71,5 +66,5 @@ void lu_igif_build(struct lu_fid *fid, __u32 ino, __u32 gen)
         fid->f_seq = LUSTRE_ROOT_FID_SEQ;
         fid->f_oid = ino;
         fid->f_ver = gen;
-        LASSERT(lu_fid_is_igif(fid));
+        LASSERT(fid_is_igif(fid));
 }
index d40e630..04439d1 100644 (file)
@@ -34,7 +34,6 @@
 struct lu_fid;
 struct osd_inode_id;
 
-int lu_fid_is_igif(const struct lu_fid *fid);
 void lu_igif_to_id(const struct lu_fid *fid, struct osd_inode_id *id);
 __u32 lu_igif_ino(const struct lu_fid *fid);
 __u32 lu_igif_gen(const struct lu_fid *fid);
index 9fda0f8..0494c3b 100644 (file)
@@ -56,7 +56,6 @@
 #include "osd_oi.h"
 /* osd_lookup(), struct osd_thread_info */
 #include "osd_internal.h"
-/* lu_fid_is_igif() */
 #include "osd_igif.h"
 #include "dt_object.h"
 
@@ -158,7 +157,7 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
 {
         int rc;
 
-        if (lu_fid_is_igif(fid)) {
+        if (fid_is_igif(fid)) {
                 lu_igif_to_id(fid, id);
                 rc = 0;
         } else {
@@ -182,7 +181,7 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
         struct dt_device    *dev;
         struct osd_inode_id *id;
 
-        if (lu_fid_is_igif(fid))
+        if (fid_is_igif(fid))
                 return 0;
 
         idx = oi->oi_dir;
@@ -205,7 +204,7 @@ int osd_oi_delete(struct osd_thread_info *info,
         struct dt_object *idx;
         struct dt_device *dev;
 
-        if (lu_fid_is_igif(fid))
+        if (fid_is_igif(fid))
                 return 0;
 
         idx = oi->oi_dir;