From 78310d6d1742cc9b8ef757a8fd4b1718e5f72193 Mon Sep 17 00:00:00 2001 From: yury Date: Sat, 14 Oct 2006 18:09:17 +0000 Subject: [PATCH] - fixes in split: - split dir may happen no any MDS - fids of slaves and master should be saved in coherence with mdsnum. Do not save fid of slave on mds0 to mea[2] for instance. This is what LMV needs in some cases and this is what may allow to get rid of doing additional fld_lookups in LMV; - in cmm_create_slave_objects() pass to cmm_create_remote_obj() last arg as sizeof(*slave_lmv) instead of sizeof(slave_lmv), otherwise edatalen is not correct and mea on slaves get broken; - in many split functions, look like this: for (i = 1; i < cmm->cmm_tgt_count + 1; i++) is not correct even for formet solution, it makes it possible to overwrite/access data out of array memory; - in cmm_create_slave_objects() memory leak in case of error (OBD_FREE_PTR(lmv) should be added); - cleanups, comments. --- lustre/cmm/cmm_internal.h | 5 ++ lustre/cmm/cmm_split.c | 122 +++++++++++++++++++++++++--------------------- 2 files changed, 71 insertions(+), 56 deletions(-) diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index b60c86e..543dd3a 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -118,6 +118,11 @@ static inline struct md_object *cmm2child_obj(struct cmm_object *o) return (o ? lu2md(lu_object_next(&o->cmo_obj.mo_lu)) : NULL); } +static inline struct lu_fid* cmm2fid(struct cmm_object *obj) +{ + return &(obj->cmo_obj.mo_lu.lo_header->loh_fid); +} + struct cmm_thread_info *cmm_env_info(const struct lu_env *env); /* cmm_object.c */ struct lu_object *cmm_object_alloc(const struct lu_env *env, diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 18d59c2..b12a2bc4 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -48,11 +48,6 @@ enum { SPLIT_SIZE = 64*1024 }; -static inline struct lu_fid* cmm2_fid(struct cmm_object *obj) -{ - return &(obj->cmo_obj.mo_lu.lo_header->loh_fid); -} - static int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, struct md_attr *ma) @@ -77,7 +72,7 @@ static int cmm_expect_splitting(const struct lu_env *env, rc = CMM_EXPECT_SPLIT; - if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo)))) + if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo)))) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); EXIT; @@ -90,45 +85,36 @@ cleanup: #define cmm_md_size(stripes) \ (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid)) -static int cmm_fid_alloc(const struct lu_env *env, struct cmm_device *cmm, - struct lu_fid *fid, int count) +static int cmm_alloc_slave_fids(const struct lu_env *env, + struct cmm_device *cmm, + struct lu_fid *fids) { - struct mdc_device *mc, *tmp; - int rc = 0, i = 0; - - LASSERT(count == cmm->cmm_tgt_count); + struct lu_site *ls = cmm->cmm_md_dev.md_lu_dev.ld_site; + struct mdc_device *mc, *tmp; + int rc = 0; /* - * XXX: in fact here would be nice to protect cmm->cmm_targets but we + * XXX: In fact here would be nice to protect cmm->cmm_targets but we * can't use spinlock here and do something complex is no time for that, * especially taking into account that split will be removed after * acceptance. So we suppose no changes to targets should happen this * time. */ list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) { - LASSERT(cmm->cmm_local_num != mc->mc_num); - - rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL); + /* Allocate slave fid on mds @mc->mc_num. */ + rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fids[mc->mc_num], NULL); if (rc > 0) { - struct lu_site *ls; - - ls = cmm->cmm_md_dev.md_lu_dev.ld_site; rc = fld_client_create(ls->ls_client_fld, - fid_seq(&fid[i]), + fid_seq(&fids[mc->mc_num]), mc->mc_num, env); if (rc) { CERROR("Can't create fld entry, " "rc %d\n", rc); } - } - - if (rc < 0) + } else if (rc < 0) { RETURN(rc); - i++; + } } - LASSERT(i == count); - if (rc == 1) - rc = 0; RETURN(rc); } @@ -155,19 +141,21 @@ static inline void cmm_object_put(const struct lu_env *env, lu_object_put(env, &o->cmo_obj.mo_lu); } -static int cmm_creat_remote_obj(const struct lu_env *env, - struct cmm_device *cmm, - struct lu_fid *fid, struct md_attr *ma, - const struct lmv_stripe_md *lmv, - int lmv_size) +static int cmm_create_remote_obj(const struct lu_env *env, + struct cmm_device *cmm, + struct lu_fid *fid, struct md_attr *ma, + const struct lmv_stripe_md *lmv, + int lmv_size) { struct cmm_object *obj; struct md_create_spec *spec; int rc; ENTRY; - /* XXX Since capablity will not work with split. so we - * pass NULL capablity here */ + /* + * XXX: Since capablity will not work with split, we pass NULL capablity + * here. Should be fixed later. + */ obj = cmm_object_find(env, cmm, fid); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); @@ -187,12 +175,13 @@ static int cmm_creat_remote_obj(const struct lu_env *env, } static int cmm_create_slave_objects(const struct lu_env *env, - struct md_object *mo, struct md_attr *ma) + struct md_object *mo, + struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL; + struct lu_fid *lf = cmm2fid(md2cmm_obj(mo)); int lmv_size, i, rc; - struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo)); ENTRY; lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); @@ -206,10 +195,11 @@ static int cmm_create_slave_objects(const struct lu_env *env, lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; lmv->mea_count = cmm->cmm_tgt_count + 1; - lmv->mea_ids[0] = *lf; + /* Store master FID to local node idx number. */ + lmv->mea_ids[cmm->cmm_local_num] = *lf; - rc = cmm_fid_alloc(env, cmm, &lmv->mea_ids[1], - cmm->cmm_tgt_count); + /* Allocate slave fids and setup FLD for them. */ + rc = cmm_alloc_slave_fids(env, cmm, lmv->mea_ids); if (rc) GOTO(cleanup, rc); @@ -217,26 +207,34 @@ static int cmm_create_slave_objects(const struct lu_env *env, if (!slave_lmv) GOTO(cleanup, rc = -ENOMEM); - slave_lmv->mea_master = cmm->cmm_local_num; slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; + slave_lmv->mea_master = cmm->cmm_local_num; slave_lmv->mea_count = 0; - for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) { - rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma, - slave_lmv, sizeof(slave_lmv)); + + for (i = 0; i < cmm->cmm_tgt_count + 1; i++) { + if (i == cmm->cmm_local_num) + continue; + + rc = cmm_create_remote_obj(env, cmm, &lmv->mea_ids[i], ma, + slave_lmv, sizeof(*slave_lmv)); if (rc) GOTO(cleanup, rc); } ma->ma_lmv_size = lmv_size; ma->ma_lmv = lmv; + EXIT; cleanup: if (slave_lmv) OBD_FREE_PTR(slave_lmv); - RETURN(rc); + if (rc && lmv) + OBD_FREE_PTR(lmv); + return rc; } static int cmm_send_split_pages(const struct lu_env *env, - struct md_object *mo, struct lu_rdpg *rdpg, + struct md_object *mo, + struct lu_rdpg *rdpg, struct lu_fid *fid, int len) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); @@ -254,7 +252,8 @@ static int cmm_send_split_pages(const struct lu_env *env, RETURN(rc); } -static int cmm_remove_dir_ent(const struct lu_env *env, struct md_object *mo, +static int cmm_remove_dir_ent(const struct lu_env *env, + struct md_object *mo, struct lu_dirent *ent) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); @@ -384,12 +383,14 @@ static int cmm_split_entries(const struct lu_env *env, } #define SPLIT_PAGE_COUNT 1 + static int cmm_scan_and_split(const struct lu_env *env, - struct md_object *mo, struct md_attr *ma) + struct md_object *mo, + struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); + struct lu_rdpg *rdpg = NULL; __u32 hash_segement; - struct lu_rdpg *rdpg = NULL; int rc = 0, i; OBD_ALLOC_PTR(rdpg); @@ -399,7 +400,7 @@ static int cmm_scan_and_split(const struct lu_env *env, rdpg->rp_npages = SPLIT_PAGE_COUNT; rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages; - OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); + OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0])); if (rdpg->rp_pages == NULL) GOTO(free_rdpg, rc = -ENOMEM); @@ -410,16 +411,22 @@ static int cmm_scan_and_split(const struct lu_env *env, } hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1); - for (i = 1; i < cmm->cmm_tgt_count + 1; i++) { - struct lu_fid *lf = &ma->ma_lmv->mea_ids[i]; + for (i = 0; i < cmm->cmm_tgt_count + 1; i++) { + struct lu_fid *lf; __u32 hash_end; + if (i == cmm->cmm_local_num) + continue; + + lf = &ma->ma_lmv->mea_ids[i]; + rdpg->rp_hash = i * hash_segement; hash_end = rdpg->rp_hash + hash_segement; rc = cmm_split_entries(env, mo, rdpg, lf, hash_end); if (rc) GOTO(cleanup, rc); } + EXIT; cleanup: for (i = 0; i < rdpg->rp_npages; i++) if (rdpg->rp_pages[i] != NULL) @@ -431,7 +438,7 @@ free_rdpg: if (rdpg) OBD_FREE_PTR(rdpg); - RETURN(rc); + return rc; } static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, @@ -466,9 +473,10 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo) if (rc != CMM_EXPECT_SPLIT) GOTO(cleanup, rc = 0); - /* Disable trans for splitting, since there will be - * so many trans in this one ops, confilct with current - * recovery design */ + /* + * Disable trans for splitting, since there will be so many trans in + * this one ops, confilct with current recovery design. + */ rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS); if (rc) GOTO(cleanup, rc = 0); @@ -484,15 +492,17 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo) GOTO(cleanup, ma); buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); + /* step4: set mea to the master object */ rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0); if (rc == -ERESTART) CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu))); + EXIT; cleanup: if (ma->ma_lmv_size && ma->ma_lmv) OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); - RETURN(rc); + return rc; } -- 1.8.3.1