Whamcloud - gitweb
Branch:b_new_cmd
authorwangdi <wangdi>
Sun, 29 Oct 2006 07:11:22 +0000 (07:11 +0000)
committerwangdi <wangdi>
Sun, 29 Oct 2006 07:11:22 +0000 (07:11 +0000)
cleanup in cmm split, after pdo added.

lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c

index 41f2e94..99b9f5e 100644 (file)
 #include <md_object.h>
 #include <linux/lustre_acl.h>
 
-#define CMM_NO_SPLIT_EXPECTED   0
-#define CMM_EXPECT_SPLIT        1
-#define CMM_NOT_SPLITTABLE      2
-
-enum {
-        CMM_SPLIT_SIZE =  64 * 1024
-};
 
 struct cmm_device {
         struct md_device      cmm_md_dev;
@@ -154,7 +147,8 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
                          struct md_attr *ma, int *split);
 
 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo);
-
+int cmm_split_lock_mode(const struct lu_env *env, struct md_object *mo,
+                        mdl_mode_t lm);
 #endif
 
 #endif /* __KERNEL__ */
index c1fe0d2..c73f836 100644 (file)
@@ -375,47 +375,11 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
                                 struct md_object *mo, mdl_mode_t lm)
 {
+        int rc = MDL_MINMODE;
 #ifdef HAVE_SPLIT_SUPPORT
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-        int rc, split, lmv_size;
-        ENTRY;
-
-        memset(ma, 0, sizeof(*ma));
-        lmv_size = ma->ma_lmv_size = (sizeof(struct lmv_stripe_md) + 
-                        (cmm->cmm_tgt_count + 1) * sizeof(struct lu_fid));
-
-        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
-        if (ma->ma_lmv == NULL)
-                RETURN(-ENOMEM);
-
-        /*
-         * Check only if we need protection from split. If not - mdt handles
-         * other cases.
-         */
-        rc = cmm_expect_splitting(env, mo, ma, &split); 
-        OBD_FREE(ma->ma_lmv, lmv_size);
-        if (rc) {
-                CERROR("Can't check for possible split, error %d\n",
-                       rc);
-                RETURN(MDL_MINMODE);
-        }
-        
-        /*
-         * Do not take PDO lock on non-splittable objects if this is not PW,
-         * this should speed things up a bit.
-         */
-        if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW)
-                RETURN(MDL_NL);
-
-        /* Protect splitting by exclusive lock. */
-        if (split == CMM_EXPECT_SPLIT && lm == MDL_PW)
-                RETURN(MDL_EX);
-
-        /* Have no idea about lock mode, let it be what higher layer wants. */
-        RETURN(MDL_MINMODE);
+        rc = cmm_split_lock_mode(env, mo, lm);
 #endif
-        return MDL_MINMODE;
+        return rc;
 }
 
 static int cml_create(const struct lu_env *env,
index 726a616..6f344d5 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
-static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
-                                  ssize_t len)
-{
-        struct lu_buf *buf;
-
-        buf = &cmm_env_info(env)->cmi_buf;
-        buf->lb_buf = area;
-        buf->lb_len = len;
-        return buf;
-}
+enum {
+        CMM_SPLIT_SIZE =  64 * 1024
+};
 
-int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
-                     const char *name)
-{
-        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
-        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-        int rc;
-        ENTRY;
-        
-        if (cmm->cmm_tgt_count == 0)
-                RETURN(0);
-
-        /* Try to get the LMV EA size */
-        memset(ma, 0, sizeof(*ma));
-        ma->ma_need = MA_LMV;
-        rc = mo_attr_get(env, mp, ma);
-        if (rc)
-                RETURN(rc);
-
-        if (ma->ma_valid & MA_LMV) {
-                int stripe;
-
-                /* 
-                 * Clean MA_LMV in ->ma_valid because mdd will do nothing
-                 * counting that EA is already taken.
-                 */
-                ma->ma_valid &= ~MA_LMV;
-
-                LASSERT(ma->ma_lmv_size > 0);
-                OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
-                if (ma->ma_lmv == NULL)
-                        RETURN(-ENOMEM);
-
-                /* Get LMV EA */
-                ma->ma_need = MA_LMV;
-                rc = mo_attr_get(env, mp, ma);
-                
-                /* Skip checking the slave dirs (mea_count is 0) */
-                if (rc == 0 && ma->ma_lmv->mea_count != 0) {
-                        /* 
-                         * Get stripe by name to check the name belongs to
-                         * master dir, otherwise return the -ERESTART
-                         */
-                        stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
-                
-                        /* Master stripe is always 0 */
-                        if (stripe != 0)
-                                rc = -ERESTART;
-                }
-                OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
-        }
-        RETURN(rc);
-}
+enum {
+        CMM_NO_SPLIT_EXPECTED = 0,
+        CMM_EXPECT_SPLIT      = 1,
+        CMM_NOT_SPLITTABLE    = 2
+};
 
 int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
                          struct md_attr *ma, int *split)
@@ -112,17 +58,14 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
         int rc;
         ENTRY;
 
-        /* 
-         * Check first most light things like tgt count and root fid. For some
-         * case this style should yeild better performance.
-         */
+        /* No need split for single MDS */
         if (cmm->cmm_tgt_count == 0) {
                 *split = CMM_NO_SPLIT_EXPECTED;
                 RETURN(0);
         }
 
-        rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
-                                              &root_fid);
+        /* No need split for Root object */
+        rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, &root_fid);
         if (rc)
                 RETURN(rc);
 
@@ -131,25 +74,29 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
                 RETURN(0);
         }
 
-        /* 
-         * MA_INODE is needed to check inode size. 
-         * Memory is prepared by caller.
+        /*
+         * Assumption: ma_valid = 0 here, we only need get
+         * inode and lmv_size for this get_attr
          */
+        LASSERT(ma->ma_valid == 0); 
         ma->ma_need = MA_INODE | MA_LMV;
         rc = mo_attr_get(env, mo, ma);
         if (rc)
                 RETURN(rc);
-        
+
+        /* No need split for already split object */
         if (ma->ma_valid & MA_LMV) {
+                LASSERT(ma->ma_lmv_size > 0);
                 *split = CMM_NOT_SPLITTABLE;
                 RETURN(0);
         }
-                
+
+        /* No need split for object whose size < CMM_SPLIT_SIZE */
         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
                 *split = CMM_NO_SPLIT_EXPECTED;
                 RETURN(0);
         }
-        
+
         *split = CMM_EXPECT_SPLIT;
         RETURN(0);
 }
@@ -177,12 +124,12 @@ static inline void cmm_object_put(const struct lu_env *env,
         lu_object_put(env, &o->cmo_obj.mo_lu);
 }
 
-static int cmm_object_create(const struct lu_env *env,
-                             struct cmm_device *cmm,
-                             struct lu_fid *fid,
-                             struct md_attr *ma,
-                             struct lmv_stripe_md *lmv,
-                             int lmv_size)
+static int cmm_slave_object_create(const struct lu_env *env, 
+                                   struct cmm_device *cmm,
+                                   struct lu_fid *fid, 
+                                   struct md_attr *ma,
+                                   struct lmv_stripe_md *lmv,
+                                   int lmv_size)
 {
         struct md_create_spec *spec;
         struct cmm_object *obj;
@@ -207,20 +154,17 @@ static int cmm_object_create(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int cmm_fid_alloc(const struct lu_env *env,
-                         struct cmm_device *cmm,
-                         struct mdc_device *mc,
-                         struct lu_fid *fid)
+static int cmm_slave_fid_alloc(const struct lu_env *env, 
+                               struct cmm_device *cmm,
+                               struct mdc_device *mc, 
+                               struct lu_fid *fid)
 {
         int rc;
         ENTRY;
 
-        LASSERT(cmm != NULL);
-        LASSERT(mc != NULL);
-        LASSERT(fid != NULL);
+        LASSERT(cmm != NULL && mc != NULL && fid != NULL);
 
         down(&mc->mc_fid_sem);
-
         /* Alloc new fid on @mc. */
         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
         if (rc > 0) {
@@ -248,6 +192,7 @@ static int cmm_slaves_create(const struct lu_env *env,
         ENTRY;
 
         lmv = ma->ma_lmv;
+        /* Init the split MEA */
         lmv->mea_master = cmm->cmm_local_num;
         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         lmv->mea_count = cmm->cmm_tgt_count + 1;
@@ -265,7 +210,7 @@ static int cmm_slaves_create(const struct lu_env *env,
 
         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
                 /* Alloc fid for slave object. */
-                rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
+                rc = cmm_slave_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
                 if (rc) {
                         CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
                                mc->mc_num, rc);
@@ -273,13 +218,14 @@ static int cmm_slaves_create(const struct lu_env *env,
                 }
 
                 /* Create slave on remote MDT. */
-                rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
-                                       slave_lmv, sizeof(*slave_lmv));
+                rc = cmm_slave_object_create(env, cmm, &lmv->mea_ids[i], ma,
+                                             slave_lmv, sizeof(*slave_lmv));
                 if (rc)
                         GOTO(cleanup, rc);
                 i++;
         }
 
+        ma->ma_valid |= MA_LMV;
         EXIT;
 cleanup:
         OBD_FREE_PTR(slave_lmv);
@@ -306,9 +252,9 @@ static int cmm_send_split_pages(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int cmm_remove_dir_ent(const struct lu_env *env,
-                              struct md_object *mo,
-                              struct lu_dirent *ent)
+static int cmm_remove_split_ent(const struct lu_env *env,
+                                struct md_object *mo,
+                                struct lu_dirent *ent)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct cmm_object *obj;
@@ -356,7 +302,8 @@ cleanup:
 }
 
 static int cmm_remove_entries(const struct lu_env *env,
-                              struct md_object *mo, struct lu_rdpg *rdpg,
+                              struct md_object *mo,
+                              struct lu_rdpg *rdpg,
                               __u32 hash_end, __u32 *len)
 {
         struct lu_dirpage *dp;
@@ -366,40 +313,42 @@ static int cmm_remove_entries(const struct lu_env *env,
 
         kmap(rdpg->rp_pages[0]);
         dp = page_address(rdpg->rp_pages[0]);
-        for (ent = lu_dirent_start(dp); ent != NULL;
+        *len = 0;
+
+        for (ent = lu_dirent_start(dp);
+             ent != NULL && ent->lde_hash < hash_end;
              ent = lu_dirent_next(ent)) {
-                if (ent->lde_hash < hash_end) {
-                        rc = cmm_remove_dir_ent(env, mo, ent);
-                        if (rc) {
-                                CERROR("Can not del %s rc %d\n", ent->lde_name,
-                                                                 rc);
-                                GOTO(unmap, rc);
-                        }
-                } else {
-                        if (ent != lu_dirent_start(dp))
-                                *len = (int)((__u32)ent - (__u32)dp);
-                        else
-                                *len = 0;
+                rc = cmm_remove_split_ent(env, mo, ent);
+                if (rc) {
+                        /* 
+                         * XXX Error handler to insert remove name back,
+                         * currently we assumed it will success anyway
+                         * in verfication test. 
+                         * 
+                         */
+                        CERROR("Can not del %*.*s rc %d\n", ent->lde_namelen,
+                                ent->lde_namelen, ent->lde_name, rc);
                         GOTO(unmap, rc);
                 }
+                *len += le16_to_cpu(ent->lde_reclen);
         }
-        *len = CFS_PAGE_SIZE;
+        if (ent != lu_dirent_start(dp))
+                *len += sizeof(struct lu_dirpage);
         EXIT;
 unmap:
         kunmap(rdpg->rp_pages[0]);
         return rc;
 }
 
-static int cmm_split_entries(const struct lu_env *env,
-                             struct md_object *mo, struct lu_rdpg *rdpg,
+static int cmm_split_entries(const struct lu_env *env, 
+                             struct md_object *mo,
+                             struct lu_rdpg *rdpg, 
                              struct lu_fid *lf, __u32 end)
 {
         int rc, done = 0;
         ENTRY;
 
-        LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
-                 "npages %d\n", rdpg->rp_npages);
-
+        LASSERT(rdpg->rp_npages == 1);
         /* Read split page and send them to the slave master. */
         do {
                 struct lu_dirpage *ldp;
@@ -444,7 +393,6 @@ static int cmm_split_entries(const struct lu_env *env,
 }
 
 #define SPLIT_PAGE_COUNT 1
-
 static int cmm_scan_and_split(const struct lu_env *env,
                               struct md_object *mo,
                               struct md_attr *ma)
@@ -471,6 +419,7 @@ static int cmm_scan_and_split(const struct lu_env *env,
                         GOTO(cleanup, rc = -ENOMEM);
         }
 
+        LASSERT(ma->ma_valid & MA_LMV);
         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
                 struct lu_fid *lf;
@@ -503,41 +452,41 @@ free_rdpg:
         return rc;
 }
 
-#define cmm_md_size(stripes) \
-       (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
+static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
+                                  ssize_t len)
+{
+        struct lu_buf *buf;
+
+        buf = &cmm_env_info(env)->cmi_buf;
+        buf->lb_buf = area;
+        buf->lb_len = len;
+        return buf;
+}
+
+#define CMM_MD_SIZE(stripes)  (sizeof(struct lmv_stripe_md) +  \
+                               (stripes) * sizeof(struct lu_fid))
 
 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
         struct lu_buf     *buf;
-        int rc = 0, split, lmv_size;
+        int rc = 0, split;
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-
         memset(ma, 0, sizeof(*ma));
-        lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
-
-        /* 
-         * Preparing memory for LMV. This will be freed after finish splitting.
-         */
-        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
-        if (ma->ma_lmv == NULL)
-                RETURN(-ENOMEM);
-
         /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma, &split);
         if (rc)
-                GOTO(cleanup, rc);
-        
+                RETURN(rc);
         if (split != CMM_EXPECT_SPLIT)
-                GOTO(cleanup, rc = 0);
+                RETURN(0);
 
         LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
                  "dir is protected by MDL_EX lock. Lock mode 0x%x\n",
                  (int)mo->mo_pdo_mode);
-        
+
         /*
          * Disable trans for splitting, since there will be so many trans in
          * this one ops, confilct with current recovery design.
@@ -548,38 +497,128 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
                 GOTO(cleanup, rc);
         }
 
-        /* Step2: Create slave objects (on slave MDTs) */
-        ma->ma_valid = 0;
-        ma->ma_lmv_size = lmv_size;
+        /* step2: Prepare the md memory */
+        ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
+        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+        if (ma->ma_lmv == NULL)
+                RETURN(-ENOMEM);
+
+        /* Step3: Create slave objects and fill the ma->ma_lmv */
         rc = cmm_slaves_create(env, mo, ma);
         if (rc) {
                 CERROR("Can't create slaves for split, rc %d\n", rc);
                 GOTO(cleanup, rc);
         }
 
-        /* Step3: Scan and split the object. */
+        /* Step4: Scan and split the object. */
         rc = cmm_scan_and_split(env, mo, ma);
         if (rc) {
                 CERROR("Can't scan and split, rc %d\n", rc);
                 GOTO(cleanup, rc);
         }
 
+        /* Step5: Set mea to the master object. */
+        LASSERT(ma->ma_valid & MA_LMV);
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
-        
-        /* Step4: Set mea to the master object. */
         rc = mo_xattr_set(env, md_object_next(mo), buf,
                           MDS_LMV_MD_NAME, 0);
-        if (rc == 0) {
-                CWARN("Dir "DFID" has been split\n",
-                      PFID(lu_object_fid(&mo->mo_lu)));
-                rc = -ERESTART;
-        } else {
-                CERROR("Can't set MEA to master dir, "
-                       "rc %d\n", rc);
+        if (rc) {
+                CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
+                GOTO(cleanup, rc);
         }
-        EXIT;
+
+        /* Finally, split succeed, tell client to recreate the object */
+        CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
+        rc = -ERESTART;
 cleanup:
-        OBD_FREE(ma->ma_lmv, lmv_size);
-        return rc;
+        OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+        RETURN(rc);
+}
+
+int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
+                     const char *name)
+{
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
+        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+        int rc;
+        ENTRY;
+        
+        if (cmm->cmm_tgt_count == 0)
+                RETURN(0);
+
+        /* Try to get the LMV EA size */
+        memset(ma, 0, sizeof(*ma));
+        ma->ma_need = MA_LMV;
+        rc = mo_attr_get(env, mp, ma);
+        if (rc)
+                RETURN(rc);
+        /* No LMV just return */
+        if (!(ma->ma_valid & MA_LMV))
+                RETURN(rc);
+
+        LASSERT(ma->ma_lmv_size > 0);
+        OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+        if (ma->ma_lmv == NULL)
+                RETURN(-ENOMEM);
+
+        /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
+        ma->ma_valid &= ~MA_LMV;
+        ma->ma_need = MA_LMV;
+        rc = mo_attr_get(env, mp, ma);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        /* Skip checking the slave dirs (mea_count is 0) */
+        if (ma->ma_lmv->mea_count != 0) {
+                int stripe;
+                /* 
+                 * Get stripe by name to check the name belongs to
+                 * master dir, otherwise return the -ERESTART
+                 */
+                stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
+
+                /* Master stripe is always 0 */
+                if (stripe != 0)
+                        rc = -ERESTART;
+        }
+cleanup:
+        OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
+        RETURN(rc);
+}
+
+int cmm_split_lock_mode(const struct lu_env *env, struct md_object *mo,
+                        mdl_mode_t lm)
+{
+        struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+        int rc, split;
+        ENTRY;
+
+        memset(ma, 0, sizeof(*ma));
+        /*
+         * Check only if we need protection from split. 
+         * If not - mdt handles other cases.
+         */
+        rc = cmm_expect_splitting(env, mo, ma, &split);
+        if (rc) {
+                CERROR("Can't check for possible split, rc %d\n", rc);
+                RETURN(MDL_MINMODE);
+        }
+
+        /*
+         * Do not take PDO lock on non-splittable objects if 
+         * this is not PW, this should speed things up a bit.
+         */
+        if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW)
+                RETURN(MDL_NL);
+
+        /* Protect splitting by exclusive lock. */
+        if (split == CMM_EXPECT_SPLIT && lm == MDL_PW)
+                RETURN(MDL_EX);
+
+        /* 
+         * XXX Have no idea about lock mode, let it be 
+         * what higher layer wants. 
+         */
+        RETURN(MDL_MINMODE);
 }