From 0a74b56d69a254f85732c9c4a75280f35df4a844 Mon Sep 17 00:00:00 2001 From: wangdi Date: Sun, 29 Oct 2006 07:11:22 +0000 Subject: [PATCH] Branch:b_new_cmd cleanup in cmm split, after pdo added. --- lustre/cmm/cmm_internal.h | 10 +- lustre/cmm/cmm_object.c | 42 +----- lustre/cmm/cmm_split.c | 335 ++++++++++++++++++++++++++-------------------- 3 files changed, 192 insertions(+), 195 deletions(-) diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index 41f2e94..99b9f5e 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -36,13 +36,6 @@ #include #include -#define CMM_NO_SPLIT_EXPECTED 0 -#define CMM_EXPECT_SPLIT 1 -#define CMM_NOT_SPLITTABLE 2 - -enum { - CMM_SPLIT_SIZE = 64 * 1024 -}; struct cmm_device { struct md_device cmm_md_dev; @@ -154,7 +147,8 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, struct md_attr *ma, int *split); int cmm_try_to_split(const struct lu_env *env, struct md_object *mo); - +int cmm_split_lock_mode(const struct lu_env *env, struct md_object *mo, + mdl_mode_t lm); #endif #endif /* __KERNEL__ */ diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index c1fe0d2..c73f836 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -375,47 +375,11 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p, static mdl_mode_t cml_lock_mode(const struct lu_env *env, struct md_object *mo, mdl_mode_t lm) { + int rc = MDL_MINMODE; #ifdef HAVE_SPLIT_SUPPORT - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - int rc, split, lmv_size; - ENTRY; - - memset(ma, 0, sizeof(*ma)); - lmv_size = ma->ma_lmv_size = (sizeof(struct lmv_stripe_md) + - (cmm->cmm_tgt_count + 1) * sizeof(struct lu_fid)); - - OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); - if (ma->ma_lmv == NULL) - RETURN(-ENOMEM); - - /* - * Check only if we need protection from split. If not - mdt handles - * other cases. - */ - rc = cmm_expect_splitting(env, mo, ma, &split); - OBD_FREE(ma->ma_lmv, lmv_size); - if (rc) { - CERROR("Can't check for possible split, error %d\n", - rc); - RETURN(MDL_MINMODE); - } - - /* - * Do not take PDO lock on non-splittable objects if this is not PW, - * this should speed things up a bit. - */ - if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW) - RETURN(MDL_NL); - - /* Protect splitting by exclusive lock. */ - if (split == CMM_EXPECT_SPLIT && lm == MDL_PW) - RETURN(MDL_EX); - - /* Have no idea about lock mode, let it be what higher layer wants. */ - RETURN(MDL_MINMODE); + rc = cmm_split_lock_mode(env, mo, lm); #endif - return MDL_MINMODE; + return rc; } static int cml_create(const struct lu_env *env, diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 726a616..6f344d5 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -40,69 +40,15 @@ #include "cmm_internal.h" #include "mdc_internal.h" -static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, - ssize_t len) -{ - struct lu_buf *buf; - - buf = &cmm_env_info(env)->cmi_buf; - buf->lb_buf = area; - buf->lb_len = len; - return buf; -} +enum { + CMM_SPLIT_SIZE = 64 * 1024 +}; -int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, - const char *name) -{ - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp)); - struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - int rc; - ENTRY; - - if (cmm->cmm_tgt_count == 0) - RETURN(0); - - /* Try to get the LMV EA size */ - memset(ma, 0, sizeof(*ma)); - ma->ma_need = MA_LMV; - rc = mo_attr_get(env, mp, ma); - if (rc) - RETURN(rc); - - if (ma->ma_valid & MA_LMV) { - int stripe; - - /* - * Clean MA_LMV in ->ma_valid because mdd will do nothing - * counting that EA is already taken. - */ - ma->ma_valid &= ~MA_LMV; - - LASSERT(ma->ma_lmv_size > 0); - OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); - if (ma->ma_lmv == NULL) - RETURN(-ENOMEM); - - /* Get LMV EA */ - ma->ma_need = MA_LMV; - rc = mo_attr_get(env, mp, ma); - - /* Skip checking the slave dirs (mea_count is 0) */ - if (rc == 0 && ma->ma_lmv->mea_count != 0) { - /* - * Get stripe by name to check the name belongs to - * master dir, otherwise return the -ERESTART - */ - stripe = mea_name2idx(ma->ma_lmv, name, strlen(name)); - - /* Master stripe is always 0 */ - if (stripe != 0) - rc = -ERESTART; - } - OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); - } - RETURN(rc); -} +enum { + CMM_NO_SPLIT_EXPECTED = 0, + CMM_EXPECT_SPLIT = 1, + CMM_NOT_SPLITTABLE = 2 +}; int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, struct md_attr *ma, int *split) @@ -112,17 +58,14 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, int rc; ENTRY; - /* - * Check first most light things like tgt count and root fid. For some - * case this style should yeild better performance. - */ + /* No need split for single MDS */ if (cmm->cmm_tgt_count == 0) { *split = CMM_NO_SPLIT_EXPECTED; RETURN(0); } - rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, - &root_fid); + /* No need split for Root object */ + rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, &root_fid); if (rc) RETURN(rc); @@ -131,25 +74,29 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, RETURN(0); } - /* - * MA_INODE is needed to check inode size. - * Memory is prepared by caller. + /* + * Assumption: ma_valid = 0 here, we only need get + * inode and lmv_size for this get_attr */ + LASSERT(ma->ma_valid == 0); ma->ma_need = MA_INODE | MA_LMV; rc = mo_attr_get(env, mo, ma); if (rc) RETURN(rc); - + + /* No need split for already split object */ if (ma->ma_valid & MA_LMV) { + LASSERT(ma->ma_lmv_size > 0); *split = CMM_NOT_SPLITTABLE; RETURN(0); } - + + /* No need split for object whose size < CMM_SPLIT_SIZE */ if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) { *split = CMM_NO_SPLIT_EXPECTED; RETURN(0); } - + *split = CMM_EXPECT_SPLIT; RETURN(0); } @@ -177,12 +124,12 @@ static inline void cmm_object_put(const struct lu_env *env, lu_object_put(env, &o->cmo_obj.mo_lu); } -static int cmm_object_create(const struct lu_env *env, - struct cmm_device *cmm, - struct lu_fid *fid, - struct md_attr *ma, - struct lmv_stripe_md *lmv, - int lmv_size) +static int cmm_slave_object_create(const struct lu_env *env, + struct cmm_device *cmm, + struct lu_fid *fid, + struct md_attr *ma, + struct lmv_stripe_md *lmv, + int lmv_size) { struct md_create_spec *spec; struct cmm_object *obj; @@ -207,20 +154,17 @@ static int cmm_object_create(const struct lu_env *env, RETURN(rc); } -static int cmm_fid_alloc(const struct lu_env *env, - struct cmm_device *cmm, - struct mdc_device *mc, - struct lu_fid *fid) +static int cmm_slave_fid_alloc(const struct lu_env *env, + struct cmm_device *cmm, + struct mdc_device *mc, + struct lu_fid *fid) { int rc; ENTRY; - LASSERT(cmm != NULL); - LASSERT(mc != NULL); - LASSERT(fid != NULL); + LASSERT(cmm != NULL && mc != NULL && fid != NULL); down(&mc->mc_fid_sem); - /* Alloc new fid on @mc. */ rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL); if (rc > 0) { @@ -248,6 +192,7 @@ static int cmm_slaves_create(const struct lu_env *env, ENTRY; lmv = ma->ma_lmv; + /* Init the split MEA */ lmv->mea_master = cmm->cmm_local_num; lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; lmv->mea_count = cmm->cmm_tgt_count + 1; @@ -265,7 +210,7 @@ static int cmm_slaves_create(const struct lu_env *env, list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) { /* Alloc fid for slave object. */ - rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]); + rc = cmm_slave_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]); if (rc) { CERROR("Can't alloc fid for slave "LPU64", rc %d\n", mc->mc_num, rc); @@ -273,13 +218,14 @@ static int cmm_slaves_create(const struct lu_env *env, } /* Create slave on remote MDT. */ - rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma, - slave_lmv, sizeof(*slave_lmv)); + rc = cmm_slave_object_create(env, cmm, &lmv->mea_ids[i], ma, + slave_lmv, sizeof(*slave_lmv)); if (rc) GOTO(cleanup, rc); i++; } + ma->ma_valid |= MA_LMV; EXIT; cleanup: OBD_FREE_PTR(slave_lmv); @@ -306,9 +252,9 @@ static int cmm_send_split_pages(const struct lu_env *env, RETURN(rc); } -static int cmm_remove_dir_ent(const struct lu_env *env, - struct md_object *mo, - struct lu_dirent *ent) +static int cmm_remove_split_ent(const struct lu_env *env, + struct md_object *mo, + struct lu_dirent *ent) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct cmm_object *obj; @@ -356,7 +302,8 @@ cleanup: } static int cmm_remove_entries(const struct lu_env *env, - struct md_object *mo, struct lu_rdpg *rdpg, + struct md_object *mo, + struct lu_rdpg *rdpg, __u32 hash_end, __u32 *len) { struct lu_dirpage *dp; @@ -366,40 +313,42 @@ static int cmm_remove_entries(const struct lu_env *env, kmap(rdpg->rp_pages[0]); dp = page_address(rdpg->rp_pages[0]); - for (ent = lu_dirent_start(dp); ent != NULL; + *len = 0; + + for (ent = lu_dirent_start(dp); + ent != NULL && ent->lde_hash < hash_end; ent = lu_dirent_next(ent)) { - if (ent->lde_hash < hash_end) { - rc = cmm_remove_dir_ent(env, mo, ent); - if (rc) { - CERROR("Can not del %s rc %d\n", ent->lde_name, - rc); - GOTO(unmap, rc); - } - } else { - if (ent != lu_dirent_start(dp)) - *len = (int)((__u32)ent - (__u32)dp); - else - *len = 0; + rc = cmm_remove_split_ent(env, mo, ent); + if (rc) { + /* + * XXX Error handler to insert remove name back, + * currently we assumed it will success anyway + * in verfication test. + * + */ + CERROR("Can not del %*.*s rc %d\n", ent->lde_namelen, + ent->lde_namelen, ent->lde_name, rc); GOTO(unmap, rc); } + *len += le16_to_cpu(ent->lde_reclen); } - *len = CFS_PAGE_SIZE; + if (ent != lu_dirent_start(dp)) + *len += sizeof(struct lu_dirpage); EXIT; unmap: kunmap(rdpg->rp_pages[0]); return rc; } -static int cmm_split_entries(const struct lu_env *env, - struct md_object *mo, struct lu_rdpg *rdpg, +static int cmm_split_entries(const struct lu_env *env, + struct md_object *mo, + struct lu_rdpg *rdpg, struct lu_fid *lf, __u32 end) { int rc, done = 0; ENTRY; - LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time" - "npages %d\n", rdpg->rp_npages); - + LASSERT(rdpg->rp_npages == 1); /* Read split page and send them to the slave master. */ do { struct lu_dirpage *ldp; @@ -444,7 +393,6 @@ static int cmm_split_entries(const struct lu_env *env, } #define SPLIT_PAGE_COUNT 1 - static int cmm_scan_and_split(const struct lu_env *env, struct md_object *mo, struct md_attr *ma) @@ -471,6 +419,7 @@ static int cmm_scan_and_split(const struct lu_env *env, GOTO(cleanup, rc = -ENOMEM); } + LASSERT(ma->ma_valid & MA_LMV); hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1); for (i = 1; i < cmm->cmm_tgt_count + 1; i++) { struct lu_fid *lf; @@ -503,41 +452,41 @@ free_rdpg: return rc; } -#define cmm_md_size(stripes) \ - (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid)) +static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, + ssize_t len) +{ + struct lu_buf *buf; + + buf = &cmm_env_info(env)->cmi_buf; + buf->lb_buf = area; + buf->lb_len = len; + return buf; +} + +#define CMM_MD_SIZE(stripes) (sizeof(struct lmv_stripe_md) + \ + (stripes) * sizeof(struct lu_fid)) int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; struct lu_buf *buf; - int rc = 0, split, lmv_size; + int rc = 0, split; ENTRY; LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); - memset(ma, 0, sizeof(*ma)); - lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); - - /* - * Preparing memory for LMV. This will be freed after finish splitting. - */ - OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); - if (ma->ma_lmv == NULL) - RETURN(-ENOMEM); - /* Step1: Checking whether the dir needs to be split. */ rc = cmm_expect_splitting(env, mo, ma, &split); if (rc) - GOTO(cleanup, rc); - + RETURN(rc); if (split != CMM_EXPECT_SPLIT) - GOTO(cleanup, rc = 0); + RETURN(0); LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if " "dir is protected by MDL_EX lock. Lock mode 0x%x\n", (int)mo->mo_pdo_mode); - + /* * Disable trans for splitting, since there will be so many trans in * this one ops, confilct with current recovery design. @@ -548,38 +497,128 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) GOTO(cleanup, rc); } - /* Step2: Create slave objects (on slave MDTs) */ - ma->ma_valid = 0; - ma->ma_lmv_size = lmv_size; + /* step2: Prepare the md memory */ + ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1); + OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); + if (ma->ma_lmv == NULL) + RETURN(-ENOMEM); + + /* Step3: Create slave objects and fill the ma->ma_lmv */ rc = cmm_slaves_create(env, mo, ma); if (rc) { CERROR("Can't create slaves for split, rc %d\n", rc); GOTO(cleanup, rc); } - /* Step3: Scan and split the object. */ + /* Step4: Scan and split the object. */ rc = cmm_scan_and_split(env, mo, ma); if (rc) { CERROR("Can't scan and split, rc %d\n", rc); GOTO(cleanup, rc); } + /* Step5: Set mea to the master object. */ + LASSERT(ma->ma_valid & MA_LMV); buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); - - /* Step4: Set mea to the master object. */ rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0); - if (rc == 0) { - CWARN("Dir "DFID" has been split\n", - PFID(lu_object_fid(&mo->mo_lu))); - rc = -ERESTART; - } else { - CERROR("Can't set MEA to master dir, " - "rc %d\n", rc); + if (rc) { + CERROR("Can't set MEA to master dir, " "rc %d\n", rc); + GOTO(cleanup, rc); } - EXIT; + + /* Finally, split succeed, tell client to recreate the object */ + CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu))); + rc = -ERESTART; cleanup: - OBD_FREE(ma->ma_lmv, lmv_size); - return rc; + OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); + RETURN(rc); +} + +int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, + const char *name) +{ + struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp)); + struct md_attr *ma = &cmm_env_info(env)->cmi_ma; + int rc; + ENTRY; + + if (cmm->cmm_tgt_count == 0) + RETURN(0); + + /* Try to get the LMV EA size */ + memset(ma, 0, sizeof(*ma)); + ma->ma_need = MA_LMV; + rc = mo_attr_get(env, mp, ma); + if (rc) + RETURN(rc); + /* No LMV just return */ + if (!(ma->ma_valid & MA_LMV)) + RETURN(rc); + + LASSERT(ma->ma_lmv_size > 0); + OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); + if (ma->ma_lmv == NULL) + RETURN(-ENOMEM); + + /* Get LMV EA, Note: refresh valid here for getting LMV_EA */ + ma->ma_valid &= ~MA_LMV; + ma->ma_need = MA_LMV; + rc = mo_attr_get(env, mp, ma); + if (rc) + GOTO(cleanup, rc); + + /* Skip checking the slave dirs (mea_count is 0) */ + if (ma->ma_lmv->mea_count != 0) { + int stripe; + /* + * Get stripe by name to check the name belongs to + * master dir, otherwise return the -ERESTART + */ + stripe = mea_name2idx(ma->ma_lmv, name, strlen(name)); + + /* Master stripe is always 0 */ + if (stripe != 0) + rc = -ERESTART; + } +cleanup: + OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); + RETURN(rc); +} + +int cmm_split_lock_mode(const struct lu_env *env, struct md_object *mo, + mdl_mode_t lm) +{ + struct md_attr *ma = &cmm_env_info(env)->cmi_ma; + int rc, split; + ENTRY; + + memset(ma, 0, sizeof(*ma)); + /* + * Check only if we need protection from split. + * If not - mdt handles other cases. + */ + rc = cmm_expect_splitting(env, mo, ma, &split); + if (rc) { + CERROR("Can't check for possible split, rc %d\n", rc); + RETURN(MDL_MINMODE); + } + + /* + * Do not take PDO lock on non-splittable objects if + * this is not PW, this should speed things up a bit. + */ + if (split == CMM_NOT_SPLITTABLE && lm != MDL_PW) + RETURN(MDL_NL); + + /* Protect splitting by exclusive lock. */ + if (split == CMM_EXPECT_SPLIT && lm == MDL_PW) + RETURN(MDL_EX); + + /* + * XXX Have no idea about lock mode, let it be + * what higher layer wants. + */ + RETURN(MDL_MINMODE); } -- 1.8.3.1