Whamcloud - gitweb
split optimization to avoid often attr_get() calling
authortappro <tappro>
Tue, 31 Oct 2006 19:47:33 +0000 (19:47 +0000)
committertappro <tappro>
Tue, 31 Oct 2006 19:47:33 +0000 (19:47 +0000)
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c

index f48d2aa..b8d0983 100644 (file)
@@ -79,6 +79,15 @@ static inline struct lu_device *cmm2lu_dev(struct cmm_device *d)
         return (&d->cmm_md_dev.md_lu_dev);
 }
 
+#ifdef HAVE_SPLIT_SUPPORT
+enum {
+        CMM_SPLIT_UNKNOWN,
+        CMM_SPLIT_NONE,
+        CMM_SPLIT_NEEDED,
+        CMM_SPLIT_DONE,
+        CMM_SPLIT_DENIED
+};
+#endif
 struct cmm_object {
         struct md_object cmo_obj;
 };
@@ -86,6 +95,10 @@ struct cmm_object {
 /* local CMM object */
 struct cml_object {
         struct cmm_object cmm_obj;
+#ifdef HAVE_SPLIT_SUPPORT
+        /* split state of object (for dirs only)*/
+        __u32             clo_split;
+#endif
 };
 
 /* remote CMM object */
@@ -134,6 +147,21 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *);
 
+/*
+ * local CMM object operations. cml_...
+ */
+static inline struct cml_object *lu2cml_obj(struct lu_object *o)
+{
+        return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cml_object *md2cml_obj(struct md_object *mo)
+{
+        return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
+}
+static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
+{
+        return container_of0(co, struct cml_object, cmm_obj);
+}
 
 int cmm_upcall(const struct lu_env *env, struct md_device *md,
                enum md_upcall_event ev);
index 999c218..2423503 100644 (file)
@@ -130,21 +130,6 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env,
  * of operations so we are avoiding multiple checks in code.
  */
 
-/*
- * local CMM object operations. cml_...
- */
-static inline struct cml_object *lu2cml_obj(struct lu_object *o)
-{
-        return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
-}
-static inline struct cml_object *md2cml_obj(struct md_object *mo)
-{
-        return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
-}
-static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
-{
-        return container_of0(co, struct cml_object, cmm_obj);
-}
 /* get local child device */
 static struct lu_device *cml_child_dev(struct cmm_device *d)
 {
@@ -169,6 +154,12 @@ static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
 
         ENTRY;
 
+#ifdef HAVE_SPLIT_SUPPORT
+        if(cd->cmm_tgt_count == 0)
+                lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
+        else
+                lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
+#endif
         c_dev = cml_child_dev(cd);
         if (c_dev == NULL) {
                 rc = -ENOENT;
@@ -419,16 +410,7 @@ static int cml_create(const struct lu_env *env,
                  * choosen.
                  */
                 rc = cmm_split_try(env, mo_p);
-                if (rc == -EALREADY) {
-                        /* 
-                         * Dir is already split and we would like to check if
-                         * name came to correct MDT. If not -ERESTART is
-                         * returned by cmm_split_check()
-                         */
-                        rc = cmm_split_check(env, mo_p, child_name);
-                        if (rc)
-                                RETURN(rc);
-                } else if (rc) {
+                if (rc) {
                         /* 
                          * -ERESTART or some split error is returned, we can't
                          * proceed with create.
index 6439ac1..d6bd3e2 100644 (file)
@@ -45,12 +45,6 @@ enum {
         CMM_SPLIT_SIZE =  64 * 1024
 };
 
-enum {
-        CMM_NO_SPLIT_EXPECTED = 0,
-        CMM_EXPECT_SPLIT      = 1,
-        CMM_NOT_SPLITTABLE    = 2
-};
-
 #define CMM_SPLIT_PAGE_COUNT 1
 
 /*
@@ -61,12 +55,15 @@ enum {
 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
                     const char *name)
 {
+        struct cml_object *clo = md2cml_obj(mp);
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
         int rc;
         ENTRY;
         
-        if (cmm->cmm_tgt_count == 0)
+        /* not split yet */
+        if (clo->clo_split == CMM_SPLIT_NONE ||
+            clo->clo_split == CMM_SPLIT_DENIED)
                 RETURN(0);
 
         /* Try to get the LMV EA size */
@@ -75,11 +72,15 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
         rc = mo_attr_get(env, mp, ma);
         if (rc)
                 RETURN(rc);
-        
+
         /* No LMV just return */
-        if (!(ma->ma_valid & MA_LMV))
+        if (!(ma->ma_valid & MA_LMV)) {
+                /* update split state if unknown */
+                if (clo->clo_split == CMM_SPLIT_UNKNOWN)
+                        clo->clo_split = CMM_SPLIT_NONE;
                 RETURN(0);
-
+        }
+        
         LASSERT(ma->ma_lmv_size > 0);
         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
         if (ma->ma_lmv == NULL)
@@ -113,6 +114,13 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
                  */
                 if (idx != 0)
                         rc = -ERESTART;
+                
+                /* update split state to DONE if unknown */
+                if (clo->clo_split == CMM_SPLIT_UNKNOWN)
+                        clo->clo_split = CMM_SPLIT_DONE;
+        } else {
+                /* split is denied for slave dir */
+                clo->clo_split = CMM_SPLIT_DENIED;
         }
         EXIT;
 cleanup:
@@ -147,11 +155,11 @@ int cmm_split_access(const struct lu_env *env, struct md_object *mo,
          * Do not take PDO lock on non-splittable objects if this is not PW,
          * this should speed things up a bit.
          */
-        if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW)
+        if (split == CMM_SPLIT_DONE && lm != MDL_PW)
                 RETURN(MDL_NL);
 
         /* Protect splitting by exclusive lock. */
-        if (split == CMM_EXPECT_SPLIT && lm == MDL_PW)
+        if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
                 RETURN(MDL_EX);
 
         /* 
@@ -160,20 +168,22 @@ int cmm_split_access(const struct lu_env *env, struct md_object *mo,
         RETURN(MDL_MINMODE);
 }
 
-/* Check if split is expected for current thead. */
+/* Check if split is expected for current thread. */
 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
                      struct md_attr *ma, int *split)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct cml_object *clo = md2cml_obj(mo);
         struct lu_fid root_fid;
         int rc;
         ENTRY;
 
-        /* No need split for single MDS */
-        if (cmm->cmm_tgt_count == 0) {
-                *split = CMM_NO_SPLIT_EXPECTED;
+        if (clo->clo_split == CMM_SPLIT_DONE ||
+            clo->clo_split == CMM_SPLIT_DENIED) {
+                *split = clo->clo_split;
                 RETURN(0);
         }
+        /* CMM_SPLIT_UNKNOWN case below */
 
         /* No need split for Root object */
         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, &root_fid);
@@ -181,7 +191,8 @@ int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
                 RETURN(rc);
 
         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
-                *split = CMM_NO_SPLIT_EXPECTED;
+                /* update split state */
+                *split = clo->clo_split == CMM_SPLIT_DENIED;
                 RETURN(0);
         }
 
@@ -198,17 +209,17 @@ int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
         /* No need split for already split object */
         if (ma->ma_valid & MA_LMV) {
                 LASSERT(ma->ma_lmv_size > 0);
-                *split = CMM_NOT_SPLITTABLE;
+                *split = clo->clo_split = CMM_SPLIT_DONE;
                 RETURN(0);
         }
 
         /* No need split for object whose size < CMM_SPLIT_SIZE */
         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
-                *split = CMM_NO_SPLIT_EXPECTED;
+                *split = clo->clo_split = CMM_SPLIT_NONE;
                 RETURN(0);
         }
-
-        *split = CMM_EXPECT_SPLIT;
+        
+        *split = clo->clo_split = CMM_SPLIT_NEEDED;
         RETURN(0);
 }
 
@@ -632,18 +643,14 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
         if (rc)
                 RETURN(rc);
 
-        if (split == CMM_NOT_SPLITTABLE) {
-                /* Let caller know that dir is already split. */
-                RETURN(-EALREADY);
-        } else if (split == CMM_NO_SPLIT_EXPECTED) {
-                /* No split is expected, caller may proceed with create. */
+        if (split != CMM_SPLIT_NEEDED) {
+                /* No split is needed, caller may proceed with create. */
                 RETURN(0);
-        } else {
-                /* Split should be done now, let's do it. */
-                CWARN("Dir "DFID" is going to split\n",
-                      PFID(lu_object_fid(&mo->mo_lu)));
-                la_size = ma->ma_attr.la_size;
         }
+        
+        /* Split should be done now, let's do it. */
+        CWARN("Dir "DFID" is going to split\n",
+              PFID(lu_object_fid(&mo->mo_lu)));
 
         /*
          * Disable transacrions for split, since there will be so many trans in
@@ -685,6 +692,9 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
                 GOTO(cleanup, rc);
         }
 
+        /* set flag in cmm_object */
+        md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
+
         /*
          * Finally, split succeed, tell client to repeat opetartion on correct
          * MDT.