From a5496185ac55fc311306fb72521c3ae3082ee263 Mon Sep 17 00:00:00 2001 From: huanghua Date: Sat, 28 Oct 2006 15:37:19 +0000 Subject: [PATCH] some code cleanup in split. --- lustre/cmm/cmm_object.c | 12 ++++++- lustre/cmm/cmm_split.c | 91 +++++++++++++++++++++++++++---------------------- lustre/cmm/mdc_object.c | 2 +- lustre/mdd/mdd_lov.c | 23 ++++++++----- lustre/mdd/mdd_object.c | 6 ++-- 5 files changed, 80 insertions(+), 54 deletions(-) diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 04edafc..c1fe0d2 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -376,15 +376,25 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env, struct md_object *mo, mdl_mode_t lm) { #ifdef HAVE_SPLIT_SUPPORT + struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - int rc, split; + int rc, split, lmv_size; ENTRY; + memset(ma, 0, sizeof(*ma)); + lmv_size = ma->ma_lmv_size = (sizeof(struct lmv_stripe_md) + + (cmm->cmm_tgt_count + 1) * sizeof(struct lu_fid)); + + OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); + if (ma->ma_lmv == NULL) + RETURN(-ENOMEM); + /* * Check only if we need protection from split. If not - mdt handles * other cases. */ rc = cmm_expect_splitting(env, mo, ma, &split); + OBD_FREE(ma->ma_lmv, lmv_size); if (rc) { CERROR("Can't check for possible split, error %d\n", rc); diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index baf7ba0..726a616 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -131,8 +131,10 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, RETURN(0); } - /* MA_INODE is needed to check inode size. */ - memset(ma, 0, sizeof(*ma)); + /* + * MA_INODE is needed to check inode size. + * Memory is prepared by caller. + */ ma->ma_need = MA_INODE | MA_LMV; rc = mo_attr_get(env, mo, ma); if (rc) @@ -152,9 +154,6 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, RETURN(0); } -#define cmm_md_size(stripes) \ - (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid)) - struct cmm_object *cmm_object_find(const struct lu_env *env, struct cmm_device *d, const struct lu_fid *f) @@ -240,20 +239,15 @@ static int cmm_slaves_create(const struct lu_env *env, struct md_object *mo, struct md_attr *ma) { - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL; - struct lu_fid *lf = cmm2fid(md2cmm_obj(mo)); - struct mdc_device *mc, *tmp; - int lmv_size, i = 1, rc = 0; + struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); + struct lu_fid *lf = cmm2fid(md2cmm_obj(mo)); + struct lmv_stripe_md *lmv; + struct lmv_stripe_md *slave_lmv = NULL; + struct mdc_device *mc, *tmp; + int i = 1, rc = 0; ENTRY; - lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); - - /* This lmv will free after finish splitting. */ - OBD_ALLOC(lmv, lmv_size); - if (!lmv) - RETURN(-ENOMEM); - + lmv = ma->ma_lmv; lmv->mea_master = cmm->cmm_local_num; lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; lmv->mea_count = cmm->cmm_tgt_count + 1; @@ -262,8 +256,8 @@ static int cmm_slaves_create(const struct lu_env *env, lmv->mea_ids[0] = *lf; OBD_ALLOC_PTR(slave_lmv); - if (!slave_lmv) - GOTO(cleanup, rc = -ENOMEM); + if (slave_lmv == NULL) + RETURN(-ENOMEM); slave_lmv->mea_master = cmm->cmm_local_num; slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; @@ -286,17 +280,9 @@ static int cmm_slaves_create(const struct lu_env *env, i++; } - ma->ma_lmv_size = lmv_size; - ma->ma_lmv = lmv; EXIT; cleanup: - if (slave_lmv) - OBD_FREE_PTR(slave_lmv); - if (rc && lmv) { - OBD_FREE(lmv, lmv_size); - ma->ma_lmv = NULL; - ma->ma_lmv_size = 0; - } + OBD_FREE_PTR(slave_lmv); return rc; } @@ -424,19 +410,25 @@ static int cmm_split_entries(const struct lu_env *env, kunmap(rdpg->rp_pages[0]); rc = mo_readpage(env, md_object_next(mo), rdpg); - if (rc) + if (rc) { + CERROR("Error in readpage: %d\n", rc); RETURN(rc); + } /* Remove the old entries */ rc = cmm_remove_entries(env, mo, rdpg, end, &len); - if (rc) + if (rc) { + CERROR("Error in remove entry: %d\n", rc); RETURN(rc); + } /* Send page to slave object */ if (len > 0) { rc = cmm_send_split_pages(env, mo, rdpg, lf, len); - if (rc) + if (rc) { + CERROR("Error in sending pages: %d\n", rc); RETURN(rc); + } } kmap(rdpg->rp_pages[0]); @@ -489,8 +481,12 @@ static int cmm_scan_and_split(const struct lu_env *env, rdpg->rp_hash = i * hash_segement; hash_end = rdpg->rp_hash + hash_segement; rc = cmm_split_entries(env, mo, rdpg, lf, hash_end); - if (rc) + if (rc) { + CERROR("Error (rc=%d) while splitting for %d: fid=" + DFID", %08x:%08x\n", rc, i, PFID(lf), + rdpg->rp_hash, hash_end); GOTO(cleanup, rc); + } } EXIT; cleanup: @@ -507,23 +503,36 @@ free_rdpg: return rc; } +#define cmm_md_size(stripes) \ + (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid)) + int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - struct lu_buf *buf; - int rc = 0, split; + struct md_attr *ma = &cmm_env_info(env)->cmi_ma; + struct lu_buf *buf; + int rc = 0, split, lmv_size; ENTRY; LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); + memset(ma, 0, sizeof(*ma)); + lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); + + /* + * Preparing memory for LMV. This will be freed after finish splitting. + */ + OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); + if (ma->ma_lmv == NULL) + RETURN(-ENOMEM); + /* Step1: Checking whether the dir needs to be split. */ rc = cmm_expect_splitting(env, mo, ma, &split); if (rc) - RETURN(rc); + GOTO(cleanup, rc); if (split != CMM_EXPECT_SPLIT) - RETURN(0); + GOTO(cleanup, rc = 0); LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if " "dir is protected by MDL_EX lock. Lock mode 0x%x\n", @@ -536,10 +545,12 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS); if (rc) { CERROR("Can't disable trans for split, rc %d\n", rc); - RETURN(rc); + GOTO(cleanup, rc); } /* Step2: Create slave objects (on slave MDTs) */ + ma->ma_valid = 0; + ma->ma_lmv_size = lmv_size; rc = cmm_slaves_create(env, mo, ma); if (rc) { CERROR("Can't create slaves for split, rc %d\n", rc); @@ -568,9 +579,7 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) } EXIT; cleanup: - if (ma->ma_lmv_size && ma->ma_lmv) - OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); - + OBD_FREE(ma->ma_lmv, lmv_size); return rc; } diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index 80ab880..101503f 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -399,7 +399,7 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_env *env, rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), page, offset); - CDEBUG(D_INFO, "send page %p offset %d fid "DFID" rc %d \n", + CDEBUG(rc ? D_ERROR : D_INFO, "send page %p offset %d fid "DFID" rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc); RETURN(rc); } diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index e235d74..c267b8d 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -577,26 +577,33 @@ int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, struct lov_mds_md *lmm, int lmm_size) { - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - struct obd_device *obd = mdd2obd_dev(mdd); - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct dt_object *next = mdd_object_child(obj); - __u32 seq = lu_object_fid(mdd2lu_obj(obj))->f_seq; - __u32 oid = lu_object_fid(mdd2lu_obj(obj))->f_oid; - struct obd_capa *oc; + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + struct obd_device *obd = mdd2obd_dev(mdd); + struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; + struct dt_object *next = mdd_object_child(obj); + const struct lu_fid *fid = lu_object_fid(mdd2lu_obj(obj)); + struct obd_capa *oc; int rc = 0; ENTRY; + mdd_read_lock(env, obj); rc = next->do_ops->do_attr_get(env, next, tmp_la, mdd_object_capa(env, obj)); + mdd_read_unlock(env, obj); if (rc) RETURN(rc); oc = next->do_ops->do_capa_get(env, next, NULL, CAPA_OPC_MDS_DEFAULT); if (IS_ERR(oc)) oc = NULL; + + /* + * Wangdi: please fix this. OST will oops if this is called. + */ +/* rc = mds_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm, - lmm_size, NULL, seq, oid, oc); + lmm_size, NULL, fid_seq(fid), fid_oid(fid), oc); +*/ capa_put(oc); RETURN(rc); diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 974cb09..9842342 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -284,7 +284,6 @@ static int __mdd_lmm_get(const struct lu_env *env, if (ma->ma_valid & MA_LOV) RETURN(0); - LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0); lmm_size = ma->ma_lmm_size; rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &lmm_size, MDS_LOV_MD_NAME); @@ -704,7 +703,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; struct lov_mds_md *lmm = NULL; - int rc = 0, lmm_size = 0, max_size = 0; + int rc, lmm_size = 0, max_size = 0; struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; ENTRY; @@ -718,6 +717,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, ma->ma_attr.la_valid & (LA_UID | LA_GID)) { max_size = mdd_lov_mdsize(env, mdd); OBD_ALLOC(lmm, max_size); + lmm_size = max_size; if (lmm == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -766,7 +766,7 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, } cleanup: mdd_trans_stop(env, mdd, rc, handle); - if (rc == 0 && lmm_size) { + if (rc == 0 && (lmm != NULL && lmm_size > 0 )) { /*set obd attr, if needed*/ rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size); } -- 1.8.3.1