Whamcloud - gitweb
- cleanups in cmm split a bit;
authoryury <yury>
Thu, 19 Oct 2006 08:07:10 +0000 (08:07 +0000)
committeryury <yury>
Thu, 19 Oct 2006 08:07:10 +0000 (08:07 +0000)
- preparation stage for adding ldlm part of pdirops (not used yet, all stuff is under CONFIG_PDIROPS)

lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/include/lustre/lustre_idl.h
lustre/include/md_object.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c

index fd1b278..b142cc6 100644 (file)
 #include <md_object.h>
 #include <linux/lustre_acl.h>
 
+#define CMM_NO_SPLIT_EXPECTED   0
+#define CMM_EXPECT_SPLIT        1
+#define CMM_NOT_SPLITTABLE      2
+
+enum {
+        CMM_SPLIT_SIZE =  64 * 1024
+};
+
 struct cmm_device {
         struct md_device       cmm_md_dev;
         /* device flags, taken from enum cmm_flags */
@@ -137,9 +145,15 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env,
 int cmm_upcall(const struct lu_env *env, struct md_device *md,
                enum md_upcall_event ev);
 #ifdef HAVE_SPLIT_SUPPORT
+
 /* cmm_split.c */
-int cml_try_to_split(const struct lu_env *env,
-                     struct md_object *mo);
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+                     const char *name);
+
+int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
+                         struct md_attr *ma);
+
+int cmm_try_to_split(const struct lu_env *env, struct md_object *mo);
 #endif
 
 #endif /* __KERNEL__ */
index 854289b..8294033 100644 (file)
@@ -361,6 +361,7 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
 {
         int rc;
         ENTRY;
+
 #ifdef HAVE_SPLIT_SUPPORT
         rc = cmm_mdsnum_check(env, mo_p, name);
         if (rc)
@@ -371,6 +372,32 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
 
 }
 
+static lu_mode_t cml_lock_mode(const struct lu_env *env,
+                               struct md_object *mo, lu_mode_t lm)
+{
+        ENTRY;
+#ifdef HAVE_SPLIT_SUPPORT
+        if (lm == LU_EX) {
+                RETURN(LU_EX);
+        } else if (lm == LU_PR) {
+                RETURN(LU_CR);
+        } else if (lm == LU_PW) {
+                struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+                int split;
+
+                memset(ma, 0, sizeof(*ma));
+                split = cmm_expect_splitting(env, mo, ma);
+
+                if (split == CMM_EXPECT_SPLIT) {
+                        RETURN(LU_EX);
+                } else {
+                        RETURN(LU_CW);
+                }
+        }
+#endif
+        RETURN(LU_MINMODE);
+}
+
 static int cml_create(const struct lu_env *env,
                       struct md_object *mo_p, const char *child_name,
                       struct md_object *mo_c, struct md_create_spec *spec,
@@ -380,7 +407,7 @@ static int cml_create(const struct lu_env *env,
         ENTRY;
 
 #ifdef HAVE_SPLIT_SUPPORT
-        rc = cml_try_to_split(env, mo_p);
+        rc = cmm_try_to_split(env, mo_p);
         if (rc)
                 RETURN(rc);
 #endif
@@ -546,6 +573,7 @@ static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
 static struct md_dir_operations cml_dir_ops = {
         .mdo_is_subdir   = cmm_is_subdir,
         .mdo_lookup      = cml_lookup,
+        .mdo_lock_mode   = cml_lock_mode,
         .mdo_create      = cml_create,
         .mdo_link        = cml_link,
         .mdo_unlink      = cml_unlink,
@@ -757,6 +785,12 @@ static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
         RETURN(-EREMOTE);
 }
 
+static lu_mode_t cmr_lock_mode(const struct lu_env *env,
+                               struct md_object *mo, lu_mode_t lm)
+{
+        RETURN(LU_MINMODE);
+}
+
 /*
  * All methods below are cross-ref by nature. They consist of remote call and
  * local operation. Due to future rollback functionality there are several
@@ -904,6 +938,7 @@ static int cmr_rename_tgt(const struct lu_env *env,
 static struct md_dir_operations cmr_dir_ops = {
         .mdo_is_subdir   = cmm_is_subdir,
         .mdo_lookup      = cmr_lookup,
+        .mdo_lock_mode   = cmr_lock_mode,
         .mdo_create      = cmr_create,
         .mdo_link        = cmr_link,
         .mdo_unlink      = cmr_unlink,
index b6e287a..ac12864 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
-#define CMM_NO_SPLIT_EXPECTED   0
-#define CMM_EXPECT_SPLIT        1
-#define CMM_NO_SPLITTABLE       2
-
-enum {
-        SPLIT_SIZE =  64*1024
-};
-
 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
                                   ssize_t len)
 {
@@ -60,12 +52,13 @@ static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
 }
 
 int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
-                   const char *name)
+                     const char *name)
 {
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
         int rc;
         ENTRY;
-        /* try to get the LMV EA size */
+        
+        /* Try to get the LMV EA size */
         memset(ma, 0, sizeof(*ma));
         ma->ma_need = MA_INODE | MA_LMV;
         rc = mo_attr_get(env, mp, ma);
@@ -79,20 +72,21 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
                 if (ma->ma_lmv == NULL)
                         RETURN(-ENOMEM);
 
-                /* get LMV EA */
+                /* Get LMV EA */
                 ma->ma_need = MA_INODE | MA_LMV;
                 rc = mo_attr_get(env, mp, ma);
                 if (rc)
                         RETURN(rc);
                 
-                /* skip checking the slave dirs (mea_count == 0) */
+                /* Skip checking the slave dirs (mea_count == 0) */
                 if (ma->ma_lmv->mea_count != 0)
                         RETURN(0);
                 /* 
-                 * Get stripe by name to check the name belongs
-                 * to master dir, otherwise return the -ERESTART
+                 * Get stripe by name to check the name belongs to master dir,
+                 * otherwise return the -ERESTART
                  */
                 stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+                
                 /* Master stripe is always 0 */
                 if (stripe != 0)
                         rc = -ERESTART;
@@ -102,23 +96,28 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
         RETURN(rc);
 }
 
-static int cmm_expect_splitting(const struct lu_env *env,
-                                struct md_object *mo,
-                                struct md_attr *ma)
+int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
+                         struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lu_fid *fid = NULL;
         int rc = CMM_EXPECT_SPLIT;
         ENTRY;
 
+        ma->ma_need = MA_INODE | MA_LMV;
+        rc = mo_attr_get(env, mo, ma);
+        if (rc)
+                GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+
         if (cmm->cmm_tgt_count == 0)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
-        if (ma->ma_attr.la_size < SPLIT_SIZE)
+        if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         if (ma->ma_lmv_size)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+        
         OBD_ALLOC_PTR(fid);
         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
         if (rc)
@@ -491,7 +490,7 @@ free_rdpg:
         return rc;
 }
 
-int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
+int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
@@ -500,14 +499,9 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-        
         memset(ma, 0, sizeof(*ma));
-        ma->ma_need = MA_INODE | MA_LMV;
-        rc = mo_attr_get(env, mo, ma);
-        if (rc)
-                GOTO(cleanup, ma);
 
-        /* step1: checking whether the dir need to be splitted */
+        /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma);
         if (rc != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
@@ -520,23 +514,25 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         if (rc)
                 GOTO(cleanup, rc = 0);
 
-        /* step2: create slave objects */
+        /* Step2: Create slave objects (on slave MDTs) */
         rc = cmm_slaves_create(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
-        /* step3: scan and split the object */
+        /* Step3: Scan and split the object. */
         rc = cmm_scan_and_split(env, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
         
-        /* step4: set mea to the master object */
-        rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
-        if (rc == -ERESTART)
+        /* Step4: Set mea to the master object. */
+        rc = mo_xattr_set(env, md_object_next(mo), buf,
+                          MDS_LMV_MD_NAME, 0);
+        if (rc == -ERESTART) {
                 CWARN("Dir "DFID" has been split\n",
                       PFID(lu_object_fid(&mo->mo_lu)));
+        }
         EXIT;
 cleanup:
         if (ma->ma_lmv_size && ma->ma_lmv)
index 8c67dea..c23b7f8 100644 (file)
@@ -1443,17 +1443,29 @@ extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id);
 /* lock types */
 typedef enum {
         LCK_MINMODE = 0,
-        LCK_EX = 1,
-        LCK_PW = 2,
-        LCK_PR = 4,
-        LCK_CW = 8,
-        LCK_CR = 16,
-        LCK_NL = 32,
-        LCK_GROUP = 64,
+        LCK_EX      = 1,
+        LCK_PW      = 2,
+        LCK_PR      = 4,
+        LCK_CW      = 8,
+        LCK_CR      = 16,
+        LCK_NL      = 32,
+        LCK_GROUP   = 64,
         LCK_MAXMODE
 } ldlm_mode_t;
 
 typedef enum {
+        LU_MINMODE  = 0,
+        LU_EX       = 1,
+        LU_PW       = 2,
+        LU_PR       = 4,
+        LU_CW       = 8,
+        LU_CR       = 16,
+        LU_NL       = 32,
+        LU_GROUP    = 64,
+        LU_MAXMODE
+} lu_mode_t;
+
+typedef enum {
         LDLM_PLAIN     = 10,
         LDLM_EXTENT    = 11,
         LDLM_FLOCK     = 12,
index 37cf3c8..520ff21 100644 (file)
@@ -196,6 +196,9 @@ struct md_dir_operations {
         int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj,
                           const char *name, struct lu_fid *fid);
 
+        lu_mode_t (*mdo_lock_mode)(const struct lu_env *env, struct md_object *obj,
+                                   lu_mode_t mode);
+
         int (*mdo_create)(const struct lu_env *env, struct md_object *pobj,
                           const char *name, struct md_object *child,
                           struct md_create_spec *spec,
@@ -454,6 +457,15 @@ static inline int mdo_lookup(const struct lu_env *env,
         return p->mo_dir_ops->mdo_lookup(env, p, name, f);
 }
 
+static inline lu_mode_t mdo_lock_mode(const struct lu_env *env,
+                                      struct md_object *mo,
+                                      lu_mode_t lm)
+{
+        if (mo->mo_dir_ops->mdo_lock_mode == NULL)
+                return LU_MINMODE;
+        return mo->mo_dir_ops->mdo_lock_mode(env, mo, lm);
+}
+
 static inline int mdo_create(const struct lu_env *env,
                              struct md_object *p,
                              const char *child_name,
index ca6ecca..ebab34b 100644 (file)
@@ -566,10 +566,10 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
                           const char* name, 
                           struct ldlm_reply *ldlm_rep)
 {
-        const struct mdt_body   *reqbody = info->mti_body;
-        struct md_object  *next = mdt_object_child(info->mti_object);
-        struct lu_fid     *child_fid = &info->mti_tmp_fid1;
-        struct mdt_body   *repbody;
+        struct md_object *next = mdt_object_child(info->mti_object);
+        const struct mdt_body *reqbody = info->mti_body;
+        struct lu_fid *child_fid = &info->mti_tmp_fid1;
+        struct mdt_body *repbody;
         int rc;
         ENTRY;
         
@@ -614,7 +614,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct ldlm_lock      *lock;
         ENTRY;
 
-        is_resent = lustre_handle_is_used(&lhc->mlh_lh);
+        is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
         if (is_resent)
                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
 
@@ -653,10 +653,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
                 if (is_resent) {
                         /* Do not take lock for resent case. */
-                        lock = ldlm_handle2lock(&lhc->mlh_lh);
+                        lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
                         if (!lock) {
                                 CERROR("Invalid lock handle "LPX64"\n",
-                                       lhc->mlh_lh.cookie);
+                                       lhc->mlh_reg_lh.cookie);
                                 LBUG();
                         }
                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
@@ -665,7 +665,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         rc = 0;
                 } else {
                         mdt_lock_handle_init(lhc);
-                        lhc->mlh_mode = LCK_CR;
+                        lhc->mlh_reg_mode = LCK_CR;
 
                         /*
                          * Object's name is on another MDS, no lookup lock is
@@ -688,7 +688,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
         /*step 1: lock parent */
         lhp = &info->mti_lh[MDT_LH_PARENT];
-        lhp->mlh_mode = LCK_CR;
+        lhp->mlh_reg_mode = LCK_CR;
         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
         if (rc != 0)
                 RETURN(rc);
@@ -710,10 +710,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 GOTO(out_parent, rc = PTR_ERR(child));
         if (is_resent) {
                 /* Do not take lock for resent case. */
-                lock = ldlm_handle2lock(&lhc->mlh_lh);
+                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
                 if (!lock) {
                         CERROR("Invalid lock handle "LPX64"\n",
-                               lhc->mlh_lh.cookie);
+                               lhc->mlh_reg_lh.cookie);
                         LBUG();
                 }
                 LASSERT(fid_res_name_eq(child_fid,
@@ -721,7 +721,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 LDLM_LOCK_PUT(lock);
         } else {
                 mdt_lock_handle_init(lhc);
-                lhc->mlh_mode = LCK_CR;
+                lhc->mlh_reg_mode = LCK_CR;
                 rc = mdt_object_cr_lock(info, child, lhc, child_bits);
                 if (rc != 0)
                         GOTO(out_child, rc);
@@ -733,7 +733,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         if (rc != 0) {
                 mdt_object_unlock(info, child, lhc, 1);
         } else {
-                struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
+                struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
                 if (lock) {
                         struct ldlm_res_id *res_id;
                         struct mdt_body *repbody;
@@ -791,9 +791,9 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
                 GOTO(out, rc);
 
         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
-        if (lustre_handle_is_used(&lhc->mlh_lh)) {
-                ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
-                lhc->mlh_lh.cookie = 0;
+        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+                ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
+                lhc->mlh_reg_lh.cookie = 0;
         }
         mdt_exit_ucred(info);
         EXIT;
@@ -1409,6 +1409,112 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
         RETURN(m);
 }
 
+static inline lu_mode_t mdt_ldlm_mode2lu_mode(ldlm_mode_t mode)
+{
+        switch (mode) {
+        case LCK_MINMODE:
+                return LU_MINMODE;
+        case LCK_EX:
+                return LU_EX;
+        case LCK_PW:
+                return LU_PW;
+        case LCK_PR:
+                return LU_PR;
+        case LCK_CW:
+                return LU_CW;
+        case LCK_CR:
+                return LU_CR;
+        case LCK_NL:
+                return LU_NL;
+        case LCK_GROUP:
+                return LU_GROUP;
+        default:
+                return 0;
+        }
+}
+
+static inline ldlm_mode_t mdt_lu_mode2ldlm_mode(lu_mode_t mode)
+{
+        switch (mode) {
+        case LU_MINMODE:
+                return LCK_MINMODE;
+        case LU_EX:
+                return LCK_EX;
+        case LU_PW:
+                return LCK_PW;
+        case LU_PR:
+                return LCK_PR;
+        case LU_CW:
+                return LCK_CW;
+        case LU_CR:
+                return LCK_CR;
+        case LU_NL:
+                return LCK_NL;
+        case LU_GROUP:
+                return LCK_GROUP;
+        default:
+                return 0;
+        }
+}
+
+int mdt_object_lock_mode(struct mdt_thread_info *info,
+                         struct mdt_object *o,
+                         struct mdt_lock_handle *lh,
+                         ldlm_mode_t lm)
+{
+        ENTRY;
+
+        lh->mlh_reg_mode = lm;
+        
+#ifdef CONFIG_PDIROPS
+        {
+                lu_mode_t mode;
+                
+                /*
+                 * Any dir access needs couple of locks:
+                 *
+                 * 1) on part of dir we gonna take lookup/modify;
+                 *
+                 * 2) on whole dir to protect it from concurrent splitting
+                 * and/or to flush client's cache for readdir().
+                 *
+                 * so, for a given mode and object this routine decides what
+                 * lock mode to use for lock #2:
+                 *
+                 * 1) if caller's gonna lookup in dir then we need to protect
+                 * dir from being splitted only - LCK_CR
+                 *
+                 * 2) if caller's gonna modify dir then we need to protect dir
+                 * from being splitted and to flush cache - LCK_CW
+                 *
+                 * 3) if caller's gonna modify dir and that dir seems ready for
+                 * splitting then we need to protect it from any type of access
+                 * (lookup/modify/split) - LCK_EX  --bzzz
+                 */
+
+                /* Ask underlaying level its opinion about possible locks. */
+                mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
+                                     mdt_ldlm_mode2lu_mode(lm));
+                if (mode != LU_MINMODE) {
+                        /* Lower layer said what lock mode it likes to be, use it. */
+                        lh->mlh_pdo_mode = mdt_lu_mode2ldlm_mode(mode);
+                } else {
+                        /* 
+                         * Lower layer does not want to specify locking mode. We od it
+                         * our selves. No special protection is needed, just flush
+                         * client's cache on modification.
+                         */
+                        if (lm == LCK_PW)
+                                lh->mlh_pdo_mode = LCK_CW;
+                        else
+                                lh->mlh_pdo_mode = LCK_MINMODE;
+                }
+        }
+#endif
+
+        RETURN(0);
+}
+
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                     struct mdt_lock_handle *lh, __u64 ibits)
 {
@@ -1418,8 +1524,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         int rc;
         ENTRY;
 
-        LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
-        LASSERT(lh->mlh_mode != LCK_MINMODE);
+        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+        LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
         if (mdt_object_exists(o) < 0) {
                 LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
                 LASSERT(ibits & MDS_INODELOCK_LOOKUP);
@@ -1427,8 +1533,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         memset(policy, 0, sizeof *policy);
         policy->l_inodebits.bits = ibits;
 
-        rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
-                      policy, res_id);
+        rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_reg_lh,
+                      lh->mlh_reg_mode, policy, res_id);
         RETURN(rc);
 }
 
@@ -1453,8 +1559,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                        struct mdt_lock_handle *lh, int decref)
 {
         struct ptlrpc_request *req    = mdt_info_req(info);
-        struct lustre_handle  *handle = &lh->mlh_lh;
-        ldlm_mode_t            mode   = lh->mlh_mode;
+        struct lustre_handle  *handle = &lh->mlh_reg_lh;
+        ldlm_mode_t            mode   = lh->mlh_reg_mode;
         ENTRY;
 
         if (lustre_handle_is_used(handle)) {
@@ -1794,13 +1900,15 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
 {
-        lh->mlh_lh.cookie = 0ull;
-        lh->mlh_mode = LCK_MINMODE;
+        lh->mlh_reg_lh.cookie = 0ull;
+        lh->mlh_reg_mode = LCK_MINMODE;
+        lh->mlh_pdo_lh.cookie = 0ull;
+        lh->mlh_pdo_mode = LCK_MINMODE;
 }
 
 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
 {
-        LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
+        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
 }
 
 /*
@@ -1898,8 +2006,8 @@ static int mdt_recovery(struct mdt_thread_info *info)
             req->rq_xid == req_exp_last_close_xid(req)) {
                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
                       (MSG_RESENT | MSG_REPLAY))) {
-                        CERROR("rq_xid "LPU64" matches last_xid, "
-                                "expected RESENT flag\n", req->rq_xid);
+                        DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches last_xid, "
+                                  "expected RESENT flag\n", req->rq_xid);
                         LBUG();
                         req->rq_status = -ENOTCONN;
                         RETURN(-ENOTCONN);
@@ -2167,15 +2275,15 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
          * lock.
          */
         if (new_lock == NULL)
-                new_lock = ldlm_handle2lock(&lh->mlh_lh);
+                new_lock = ldlm_handle2lock(&lh->mlh_reg_lh);
 
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
-                lh->mlh_lh.cookie = 0;
+                lh->mlh_reg_lh.cookie = 0;
                 RETURN(0);
         }
 
         LASSERTF(new_lock != NULL,
-                 "lockh "LPX64"\n", lh->mlh_lh.cookie);
+                 "lockh "LPX64"\n", lh->mlh_reg_lh.cookie);
 
         /*
          * If we've already given this lock to a client once, then we should
@@ -2199,7 +2307,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
                  */
                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                         MSG_RESENT);
-                lh->mlh_lh.cookie = 0;
+                lh->mlh_reg_lh.cookie = 0;
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
@@ -2219,7 +2327,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
 
         unlock_res_and_lock(new_lock);
         LDLM_LOCK_PUT(new_lock);
-        lh->mlh_lh.cookie = 0;
+        lh->mlh_reg_lh.cookie = 0;
 
         RETURN(ELDLM_LOCK_REPLACED);
 }
@@ -2248,12 +2356,12 @@ static void mdt_intent_fixup_resent(struct req_capsule *pill,
                 if (lock == new_lock)
                         continue;
                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
-                        lh->mlh_lh.cookie = lock->l_handle.h_cookie;
-                        lh->mlh_mode = lock->l_granted_mode;
+                        lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
+                        lh->mlh_reg_mode = lock->l_granted_mode;
 
                         LDLM_DEBUG(lock, "restoring lock cookie");
                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
-                                  lh->mlh_lh.cookie);
+                                  lh->mlh_reg_lh.cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
                         spin_unlock(&exp->exp_ldlm_data.led_lock);
@@ -2335,7 +2443,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 ldlm_rep->lock_policy_res2 = 0;
         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
             ldlm_rep->lock_policy_res2) {
-                lhc->mlh_lh.cookie = 0ull;
+                lhc->mlh_reg_lh.cookie = 0ull;
                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
         }
 
@@ -2390,13 +2498,13 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
 
         /* cross-ref case, the lock should be returned to the client */
         if (rc == -EREMOTE) {
-                LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
+                LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
                 rep->lock_policy_res2 = 0;
                 RETURN(mdt_intent_lock_replace(info, lockp, NULL, lhc, flags));
         }
         rep->lock_policy_res2 = clear_serious(rc);
 
-        lhc->mlh_lh.cookie = 0ull;
+        lhc->mlh_reg_lh.cookie = 0ull;
         RETURN(ELDLM_LOCK_ABORTED);
 }
 
index ad44c01..3250a7c 100644 (file)
@@ -197,8 +197,13 @@ struct mdt_object {
 };
 
 struct mdt_lock_handle {
-        struct lustre_handle    mlh_lh;
-        ldlm_mode_t             mlh_mode;
+        /* Regular lock */
+        struct lustre_handle    mlh_reg_lh;
+        ldlm_mode_t             mlh_reg_mode;
+
+        /* Pdirops lock */
+        struct lustre_handle    mlh_pdo_lh;
+        ldlm_mode_t             mlh_pdo_mode;
 };
 
 enum {
index 2aadf00..cc20816 100644 (file)
@@ -148,7 +148,7 @@ int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
          * In the later case, mdt_reint_setattr will do it. */
         if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
                 struct mdt_lock_handle  *lh = &info->mti_lh[MDT_LH_CHILD];
-                lh->mlh_mode = LCK_EX;
+                lh->mlh_reg_mode = LCK_EX;
                 rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE);
                 if (rc == 0)
                         mdt_object_unlock(info, o, lh, 1);
@@ -734,9 +734,9 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         if (!(create_flags & MDS_OPEN_CREAT))
-                lh->mlh_mode = LCK_CR;
+                lh->mlh_reg_mode = LCK_CR;
         else
-                lh->mlh_mode = LCK_EX;
+                lh->mlh_reg_mode = LCK_EX;
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
@@ -819,16 +819,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                          */
                         LASSERT(lhc != NULL);
 
-                        if (lustre_handle_is_used(&lhc->mlh_lh)) {
+                        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                                 struct ldlm_lock *lock;
 
                                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                                         MSG_RESENT);
 
-                                lock = ldlm_handle2lock(&lhc->mlh_lh);
+                                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
                                 if (!lock) {
                                         CERROR("Invalid lock handle "LPX64"\n",
-                                               lhc->mlh_lh.cookie);
+                                               lhc->mlh_reg_lh.cookie);
                                         LBUG();
                                 }
                                 LASSERT(fid_res_name_eq(mdt_object_fid(child),
@@ -837,7 +837,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                 rc = 0;
                         } else {
                                 mdt_lock_handle_init(lhc);
-                                lhc->mlh_mode = LCK_CR;
+                                lhc->mlh_reg_mode = LCK_CR;
 
                                 rc = mdt_object_lock(info, child, lhc,
                                                      MDS_INODELOCK_LOOKUP);
index da49c6b..3911547 100644 (file)
@@ -51,7 +51,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_mode = LCK_EX;
+        lh->mlh_reg_mode = LCK_EX;
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
@@ -152,7 +152,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
                 RETURN(0);
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_mode = LCK_EX;
+        lh->mlh_reg_mode = LCK_EX;
 
         if (!(flags & MRF_SETATTR_LOCKED)) {
                 __u64 lockpart = MDS_INODELOCK_UPDATE;
@@ -359,7 +359,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         /* step 1: lock the parent */
         parent_lh = &info->mti_lh[MDT_LH_PARENT];
-        parent_lh->mlh_mode = LCK_EX;
+        parent_lh->mlh_reg_mode = LCK_EX;
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
                                   MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
@@ -416,7 +416,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
-        child_lh->mlh_mode = LCK_EX;
+        child_lh->mlh_reg_mode = LCK_EX;
         rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
         if (rc != 0)
                 GOTO(out_put_child, rc);
@@ -471,7 +471,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (rr->rr_name[0] == 0) {
                 /* MDT holding name ask us to add ref. */
                 lhs = &info->mti_lh[MDT_LH_CHILD];
-                lhs->mlh_mode = LCK_EX;
+                lhs->mlh_reg_mode = LCK_EX;
                 ms = mdt_object_find_lock(info, rr->rr_fid2, lhs,
                                           MDS_INODELOCK_UPDATE);
                 if (IS_ERR(ms))
@@ -485,7 +485,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
 
         /* step 1: find & lock the target parent dir */
         lhp = &info->mti_lh[MDT_LH_PARENT];
-        lhp->mlh_mode = LCK_EX;
+        lhp->mlh_reg_mode = LCK_EX;
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
                                   MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
@@ -493,7 +493,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
 
         /* step 2: find & lock the source */
         lhs = &info->mti_lh[MDT_LH_CHILD];
-        lhs->mlh_mode = LCK_EX;
+        lhs->mlh_reg_mode = LCK_EX;
         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(ms))
                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
@@ -539,7 +539,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         /* step 1: lookup & lock the tgt dir */
         lh_tgt = &info->mti_lh[MDT_LH_CHILD];
         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
-        lh_tgtdir->mlh_mode = LCK_EX;
+        lh_tgtdir->mlh_reg_mode = LCK_EX;
         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
@@ -557,7 +557,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
                         GOTO(out_unlock_tgtdir, rc);
 
-                lh_tgt->mlh_mode = LCK_EX;
+                lh_tgt->mlh_reg_mode = LCK_EX;
 
                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
                                             MDS_INODELOCK_LOOKUP);
@@ -703,7 +703,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         /* step 1: lock the source dir */
         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
-        lh_srcdirp->mlh_mode = LCK_EX;
+        lh_srcdirp->mlh_reg_mode = LCK_EX;
         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(msrcdir))
@@ -711,7 +711,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         /*step 2: find & lock the target dir*/
         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
-        lh_tgtdirp->mlh_mode = LCK_EX;
+        lh_tgtdirp->mlh_reg_mode = LCK_EX;
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
                 mdt_object_get(info->mti_env, msrcdir);
                 mtgtdir = msrcdir;
@@ -740,7 +740,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 GOTO(out_unlock_target, rc = -EINVAL);
 
         lh_oldp = &info->mti_lh[MDT_LH_OLD];
-        lh_oldp->mlh_mode = LCK_EX;
+        lh_oldp->mlh_reg_mode = LCK_EX;
         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
                                     MDS_INODELOCK_LOOKUP);
         if (IS_ERR(mold))
@@ -759,7 +759,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                     lu_fid_eq(new_fid, rr->rr_fid2))
                         GOTO(out_unlock_old, rc = -EINVAL);
 
-                lh_newp->mlh_mode = LCK_EX;
+                lh_newp->mlh_reg_mode = LCK_EX;
                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
@@ -809,9 +809,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 mdt_handle_last_unlink(info, mnew, ma);
 
 out_unlock_new:
-        if (mnew) {
+        if (mnew)
                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
-        }
 out_unlock_old:
         mdt_object_unlock_put(info, mold, lh_oldp, rc);
 out_unlock_target:
index 6ab8985..6fbf06e 100644 (file)
@@ -315,7 +315,7 @@ int mdt_setxattr(struct mdt_thread_info *info)
                 lockpart |= MDS_INODELOCK_LOOKUP;
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_mode = LCK_EX;
+        lh->mlh_reg_mode = LCK_EX;
         rc = mdt_object_lock(info, obj, lh, lockpart);
         if (rc != 0)
                 GOTO(out, rc);