Whamcloud - gitweb
- fixes in split:
authoryury <yury>
Sat, 14 Oct 2006 18:09:17 +0000 (18:09 +0000)
committeryury <yury>
Sat, 14 Oct 2006 18:09:17 +0000 (18:09 +0000)
  - split dir may happen no any MDS - fids of slaves and master should be saved in coherence with mdsnum. Do not save fid of slave on mds0 to mea[2] for instance. This is what LMV needs in some cases and this is what may allow to get rid of doing additional fld_lookups in LMV;

  - in cmm_create_slave_objects() pass to cmm_create_remote_obj() last arg as sizeof(*slave_lmv) instead of sizeof(slave_lmv), otherwise edatalen is not correct and mea on slaves get broken;

  - in many split functions, look like this: for (i = 1; i < cmm->cmm_tgt_count + 1; i++) is not correct even for formet solution, it makes it possible to overwrite/access data out of array memory;

  - in cmm_create_slave_objects() memory leak in case of error (OBD_FREE_PTR(lmv) should be added);
  - cleanups, comments.

lustre/cmm/cmm_internal.h
lustre/cmm/cmm_split.c

index b60c86e..543dd3a 100644 (file)
@@ -118,6 +118,11 @@ static inline struct md_object *cmm2child_obj(struct cmm_object *o)
         return (o ? lu2md(lu_object_next(&o->cmo_obj.mo_lu)) : NULL);
 }
 
+static inline struct lu_fid* cmm2fid(struct cmm_object *obj)
+{
+       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+}
+
 struct cmm_thread_info *cmm_env_info(const struct lu_env *env);
 /* cmm_object.c */
 struct lu_object *cmm_object_alloc(const struct lu_env *env,
index 18d59c2..b12a2bc 100644 (file)
@@ -48,11 +48,6 @@ enum {
         SPLIT_SIZE =  64*1024
 };
 
-static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
-{
-       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
-}
-
 static int cmm_expect_splitting(const struct lu_env *env,
                                 struct md_object *mo,
                                 struct md_attr *ma)
@@ -77,7 +72,7 @@ static int cmm_expect_splitting(const struct lu_env *env,
 
         rc = CMM_EXPECT_SPLIT;
 
-        if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
+        if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo))))
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         EXIT;
@@ -90,45 +85,36 @@ cleanup:
 #define cmm_md_size(stripes) \
        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
 
-static int cmm_fid_alloc(const struct lu_env *env, struct cmm_device *cmm,
-                         struct lu_fid *fid, int count)
+static int cmm_alloc_slave_fids(const struct lu_env *env,
+                                struct cmm_device *cmm,
+                                struct lu_fid *fids)
 {
-        struct  mdc_device *mc, *tmp;
-        int rc = 0, i = 0;
-
-        LASSERT(count == cmm->cmm_tgt_count);
+        struct lu_site *ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
+        struct mdc_device *mc, *tmp;
+        int rc = 0;
 
         /* 
-         * XXX: in fact here would be nice to protect cmm->cmm_targets but we
+         * XXX: In fact here would be nice to protect cmm->cmm_targets but we
          * can't use spinlock here and do something complex is no time for that,
          * especially taking into account that split will be removed after
          * acceptance. So we suppose no changes to targets should happen this
          * time.
          */
         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
-                LASSERT(cmm->cmm_local_num != mc->mc_num);
-
-                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
+                /* Allocate slave fid on mds @mc->mc_num. */
+                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[mc->mc_num], NULL);
                 if (rc > 0) {
-                        struct lu_site *ls;
-
-                        ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
                         rc = fld_client_create(ls->ls_client_fld,
-                                               fid_seq(&fid[i]),
+                                               fid_seq(&fids[mc->mc_num]),
                                                mc->mc_num, env);
                         if (rc) {
                                 CERROR("Can't create fld entry, "
                                        "rc %d\n", rc);
                         }
-                }
-                
-                if (rc < 0)
+                } else if (rc < 0) {
                         RETURN(rc);
-                i++;
+                }
         }
-        LASSERT(i == count);
-        if (rc == 1)
-                rc = 0;
         RETURN(rc);
 }
 
@@ -155,19 +141,21 @@ static inline void cmm_object_put(const struct lu_env *env,
         lu_object_put(env, &o->cmo_obj.mo_lu);
 }
 
-static int cmm_creat_remote_obj(const struct lu_env *env,
-                                struct cmm_device *cmm,
-                                struct lu_fid *fid, struct md_attr *ma,
-                                const struct lmv_stripe_md *lmv,
-                                int lmv_size)
+static int cmm_create_remote_obj(const struct lu_env *env,
+                                 struct cmm_device *cmm,
+                                 struct lu_fid *fid, struct md_attr *ma,
+                                 const struct lmv_stripe_md *lmv,
+                                 int lmv_size)
 {
         struct cmm_object *obj;
         struct md_create_spec *spec;
         int rc;
         ENTRY;
 
-        /* XXX Since capablity will not work with split. so we
-         * pass NULL capablity here */
+        /*
+         * XXX: Since capablity will not work with split, we pass NULL capablity
+         * here. Should be fixed later.
+         */
         obj = cmm_object_find(env, cmm, fid);
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
@@ -187,12 +175,13 @@ static int cmm_creat_remote_obj(const struct lu_env *env,
 }
 
 static int cmm_create_slave_objects(const struct lu_env *env,
-                                    struct md_object *mo, struct md_attr *ma)
+                                    struct md_object *mo,
+                                    struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
+        struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
         int lmv_size, i, rc;
-        struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
         ENTRY;
 
         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
@@ -206,10 +195,11 @@ static int cmm_create_slave_objects(const struct lu_env *env,
         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
         lmv->mea_count = cmm->cmm_tgt_count + 1;
 
-        lmv->mea_ids[0] = *lf;
+        /* Store master FID to local node idx number. */
+        lmv->mea_ids[cmm->cmm_local_num] = *lf;
 
-        rc = cmm_fid_alloc(env, cmm, &lmv->mea_ids[1],
-                           cmm->cmm_tgt_count);
+        /* Allocate slave fids and setup FLD for them. */
+        rc = cmm_alloc_slave_fids(env, cmm, lmv->mea_ids);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -217,26 +207,34 @@ static int cmm_create_slave_objects(const struct lu_env *env,
         if (!slave_lmv)
                 GOTO(cleanup, rc = -ENOMEM);
 
-        slave_lmv->mea_master = cmm->cmm_local_num;
         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
+        slave_lmv->mea_master = cmm->cmm_local_num;
         slave_lmv->mea_count = 0;
-        for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
-                rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
-                                          slave_lmv, sizeof(slave_lmv));
+
+        for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
+                if (i == cmm->cmm_local_num)
+                        continue;
+                
+                rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
+                                           slave_lmv, sizeof(*slave_lmv));
                 if (rc)
                         GOTO(cleanup, rc);
         }
 
         ma->ma_lmv_size = lmv_size;
         ma->ma_lmv = lmv;
+        EXIT;
 cleanup:
         if (slave_lmv)
                 OBD_FREE_PTR(slave_lmv);
-        RETURN(rc);
+        if (rc && lmv)
+                OBD_FREE_PTR(lmv);
+        return rc;
 }
 
 static int cmm_send_split_pages(const struct lu_env *env,
-                                struct md_object *mo, struct lu_rdpg *rdpg,
+                                struct md_object *mo,
+                                struct lu_rdpg *rdpg,
                                 struct lu_fid *fid, int len)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
@@ -254,7 +252,8 @@ static int cmm_send_split_pages(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int cmm_remove_dir_ent(const struct lu_env *env, struct md_object *mo,
+static int cmm_remove_dir_ent(const struct lu_env *env,
+                              struct md_object *mo,
                               struct lu_dirent *ent)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
@@ -384,12 +383,14 @@ static int cmm_split_entries(const struct lu_env *env,
 }
 
 #define SPLIT_PAGE_COUNT 1
+
 static int cmm_scan_and_split(const struct lu_env *env,
-                              struct md_object *mo, struct md_attr *ma)
+                              struct md_object *mo,
+                              struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct lu_rdpg *rdpg = NULL;
         __u32 hash_segement;
-        struct lu_rdpg   *rdpg = NULL;
         int rc = 0, i;
 
         OBD_ALLOC_PTR(rdpg);
@@ -399,7 +400,7 @@ static int cmm_scan_and_split(const struct lu_env *env,
         rdpg->rp_npages = SPLIT_PAGE_COUNT;
         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
 
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
         if (rdpg->rp_pages == NULL)
                 GOTO(free_rdpg, rc = -ENOMEM);
 
@@ -410,16 +411,22 @@ static int cmm_scan_and_split(const struct lu_env *env,
         }
 
         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
-        for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
-                struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+        for (i = 0; i < cmm->cmm_tgt_count + 1; i++) {
+                struct lu_fid *lf;
                 __u32 hash_end;
 
+                if (i == cmm->cmm_local_num)
+                        continue;
+                
+                lf = &ma->ma_lmv->mea_ids[i];
+
                 rdpg->rp_hash = i * hash_segement;
                 hash_end = rdpg->rp_hash + hash_segement;
                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
                 if (rc)
                         GOTO(cleanup, rc);
         }
+        EXIT;
 cleanup:
         for (i = 0; i < rdpg->rp_npages; i++)
                 if (rdpg->rp_pages[i] != NULL)
@@ -431,7 +438,7 @@ free_rdpg:
         if (rdpg)
                 OBD_FREE_PTR(rdpg);
 
-        RETURN(rc);
+        return rc;
 }
 
 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
@@ -466,9 +473,10 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
         if (rc != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
 
-        /* Disable trans for splitting, since there will be
-         * so many trans in this one ops, confilct with current
-         * recovery design */
+        /*
+         * Disable trans for splitting, since there will be so many trans in
+         * this one ops, confilct with current recovery design.
+         */
         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
         if (rc)
                 GOTO(cleanup, rc = 0);
@@ -484,15 +492,17 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
                 GOTO(cleanup, ma);
 
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
+        
         /* step4: set mea to the master object */
         rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
         if (rc == -ERESTART)
                 CWARN("Dir "DFID" has been split\n",
                       PFID(lu_object_fid(&mo->mo_lu)));
+        EXIT;
 cleanup:
         if (ma->ma_lmv_size && ma->ma_lmv)
                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
         
-        RETURN(rc);
+        return rc;
 }