X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fcmm%2Fcmm_split.c;h=293a5bb88859ae598e6e0420daca5e76231a0671;hb=a9ef8e5d49753281ff76094c90fa22accd774096;hp=847b4285f6440c662f6f43b511a1b90b23806c43;hpb=ac74a14c4717e4cca548863f0194d8157ad9eba2;p=fs%2Flustre-release.git diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 847b428..293a5bb 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -40,36 +40,84 @@ #include "cmm_internal.h" #include "mdc_internal.h" -#define CMM_NO_SPLIT_EXPECTED 0 -#define CMM_EXPECT_SPLIT 1 -#define CMM_NO_SPLITTABLE 2 +static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, + ssize_t len) +{ + struct lu_buf *buf; -enum { - SPLIT_SIZE = 64*1024 -}; + buf = &cmm_env_info(env)->cmi_buf; + buf->lb_buf = area; + buf->lb_len = len; + return buf; +} -static inline struct lu_fid* cmm2_fid(struct cmm_object *obj) +int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, + const char *name) { - return &(obj->cmo_obj.mo_lu.lo_header->loh_fid); + struct md_attr *ma = &cmm_env_info(env)->cmi_ma; + int rc; + ENTRY; + + /* Try to get the LMV EA size */ + memset(ma, 0, sizeof(*ma)); + ma->ma_need = MA_INODE | MA_LMV; + rc = mo_attr_get(env, mp, ma); + if (rc) + RETURN(rc); + + if (ma->ma_valid & MA_LMV) { + int stripe; + + OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); + if (ma->ma_lmv == NULL) + RETURN(-ENOMEM); + + /* Get LMV EA */ + ma->ma_need = MA_INODE | MA_LMV; + rc = mo_attr_get(env, mp, ma); + if (rc) + RETURN(rc); + + /* Skip checking the slave dirs (mea_count == 0) */ + if (ma->ma_lmv->mea_count == 0) + RETURN(0); + /* + * Get stripe by name to check the name belongs to master dir, + * otherwise return the -ERESTART + */ + stripe = mea_name2idx(ma->ma_lmv, name, strlen(name)); + + /* Master stripe is always 0 */ + if (stripe != 0) + rc = -ERESTART; + + OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); + } + RETURN(rc); } -static int cmm_expect_splitting(const struct lu_env *env, - struct md_object *mo, - struct md_attr *ma) +int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, + struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct lu_fid *fid = NULL; int rc = CMM_EXPECT_SPLIT; ENTRY; + ma->ma_need = MA_INODE | MA_LMV; + rc = mo_attr_get(env, mo, ma); + if (rc) + GOTO(cleanup, rc = CMM_NOT_SPLITTABLE); + if (cmm->cmm_tgt_count == 0) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); - if (ma->ma_attr.la_size < SPLIT_SIZE) + if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); if (ma->ma_lmv_size) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); + OBD_ALLOC_PTR(fid); rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid); if (rc) @@ -77,65 +125,28 @@ static int cmm_expect_splitting(const struct lu_env *env, rc = CMM_EXPECT_SPLIT; - if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo)))) + if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo)))) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); + EXIT; cleanup: if (fid) OBD_FREE_PTR(fid); - RETURN(rc); + return rc; } -#define cmm_md_size(stripes) \ +#define cmm_md_size(stripes) \ (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid)) -static int cmm_alloc_fid(const struct lu_env *env, struct cmm_device *cmm, - struct lu_fid *fid, int count) -{ - struct mdc_device *mc, *tmp; - int rc = 0, i = 0; - - LASSERT(count == cmm->cmm_tgt_count); - /* FIXME: this spin_lock maybe not proper, - * because fid_alloc may need RPC */ - spin_lock(&cmm->cmm_tgt_guard); - list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, - mc_linkage) { - LASSERT(cmm->cmm_local_num != mc->mc_num); - - rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL); - if (rc > 0) { - struct lu_site *ls; - - ls = cmm->cmm_md_dev.md_lu_dev.ld_site; - rc = fld_client_create(ls->ls_client_fld, - fid_seq(&fid[i]), - mc->mc_num, env); - } - if (rc < 0) { - spin_unlock(&cmm->cmm_tgt_guard); - RETURN(rc); - } - i++; - } - spin_unlock(&cmm->cmm_tgt_guard); - LASSERT(i == count); - if (rc == 1) - rc = 0; - RETURN(rc); -} - struct cmm_object *cmm_object_find(const struct lu_env *env, struct cmm_device *d, - const struct lu_fid *f, - struct lustre_capa *capa) + const struct lu_fid *f) { struct lu_object *o; struct cmm_object *m; ENTRY; - o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f, - capa); + o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f); if (IS_ERR(o)) m = (struct cmm_object *)o; else @@ -150,20 +161,19 @@ static inline void cmm_object_put(const struct lu_env *env, lu_object_put(env, &o->cmo_obj.mo_lu); } -static int cmm_creat_remote_obj(const struct lu_env *env, - struct cmm_device *cmm, - struct lu_fid *fid, struct md_attr *ma, - const struct lmv_stripe_md *lmv, - int lmv_size) +static int cmm_object_create(const struct lu_env *env, + struct cmm_device *cmm, + struct lu_fid *fid, + struct md_attr *ma, + struct lmv_stripe_md *lmv, + int lmv_size) { - struct cmm_object *obj; struct md_create_spec *spec; + struct cmm_object *obj; int rc; ENTRY; - /* XXX Since capablity will not work with split. so we - * pass NULL capablity here */ - obj = cmm_object_find(env, cmm, fid, NULL); + obj = cmm_object_find(env, cmm, fid); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); @@ -181,18 +191,48 @@ static int cmm_creat_remote_obj(const struct lu_env *env, RETURN(rc); } -static int cmm_create_slave_objects(const struct lu_env *env, - struct md_object *mo, struct md_attr *ma) +static int cmm_fid_alloc(const struct lu_env *env, + struct cmm_device *cmm, + struct mdc_device *mc, + struct lu_fid *fid) +{ + int rc; + ENTRY; + + LASSERT(cmm != NULL); + LASSERT(mc != NULL); + LASSERT(fid != NULL); + + down(&mc->mc_fid_sem); + + /* Alloc new fid on @mc. */ + rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL); + if (rc > 0) { + /* Setup FLD for new sequenceif needed. */ + rc = fld_client_create(cmm->cmm_fld, fid_seq(fid), + mc->mc_num, env); + if (rc) + CERROR("Can't create fld entry, rc %d\n", rc); + } + up(&mc->mc_fid_sem); + + RETURN(rc); +} + +static int cmm_slaves_create(const struct lu_env *env, + struct md_object *mo, + struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL; - int lmv_size, i, rc; - struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo)); + struct lu_fid *lf = cmm2fid(md2cmm_obj(mo)); + struct mdc_device *mc, *tmp; + int lmv_size, i = 1, rc = 0; ENTRY; lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); - /* This lmv will be free after finish splitting. */ + /* This lmv will free after finish splitting. */ OBD_ALLOC(lmv, lmv_size); if (!lmv) RETURN(-ENOMEM); @@ -201,13 +241,9 @@ static int cmm_create_slave_objects(const struct lu_env *env, lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; lmv->mea_count = cmm->cmm_tgt_count + 1; + /* Store master FID to local node idx number. */ lmv->mea_ids[0] = *lf; - rc = cmm_alloc_fid(env, cmm, &lmv->mea_ids[1], - cmm->cmm_tgt_count); - if (rc) - GOTO(cleanup, rc); - OBD_ALLOC_PTR(slave_lmv); if (!slave_lmv) GOTO(cleanup, rc = -ENOMEM); @@ -215,23 +251,41 @@ static int cmm_create_slave_objects(const struct lu_env *env, slave_lmv->mea_master = cmm->cmm_local_num; slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT; slave_lmv->mea_count = 0; - for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) { - rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma, - slave_lmv, sizeof(slave_lmv)); + + list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) { + /* Alloc fid for slave object. */ + rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]); + if (rc) { + CERROR("Can't alloc fid for slave "LPU64", rc %d\n", + mc->mc_num, rc); + GOTO(cleanup, rc); + } + + /* Create slave on remote MDT. */ + rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma, + slave_lmv, sizeof(*slave_lmv)); if (rc) GOTO(cleanup, rc); + i++; } ma->ma_lmv_size = lmv_size; ma->ma_lmv = lmv; + EXIT; cleanup: if (slave_lmv) OBD_FREE_PTR(slave_lmv); - RETURN(rc); + if (rc && lmv) { + OBD_FREE(lmv, lmv_size); + ma->ma_lmv = NULL; + ma->ma_lmv_size = 0; + } + return rc; } static int cmm_send_split_pages(const struct lu_env *env, - struct md_object *mo, struct lu_rdpg *rdpg, + struct md_object *mo, + struct lu_rdpg *rdpg, struct lu_fid *fid, int len) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); @@ -239,7 +293,7 @@ static int cmm_send_split_pages(const struct lu_env *env, int rc = 0; ENTRY; - obj = cmm_object_find(env, cmm, fid, NULL); + obj = cmm_object_find(env, cmm, fid); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); @@ -249,6 +303,55 @@ static int cmm_send_split_pages(const struct lu_env *env, RETURN(rc); } +static int cmm_remove_dir_ent(const struct lu_env *env, + struct md_object *mo, + struct lu_dirent *ent) +{ + struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); + struct cmm_object *obj; + char *name; + int is_dir, rc; + ENTRY; + + if (!strncmp(ent->lde_name, ".", ent->lde_namelen) || + !strncmp(ent->lde_name, "..", ent->lde_namelen)) + RETURN(0); + + obj = cmm_object_find(env, cmm, &ent->lde_fid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + + if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0) + is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu)); + else + /* XXX: is this correct? */ + is_dir = 1; + + OBD_ALLOC(name, ent->lde_namelen + 1); + if (!name) + GOTO(cleanup, rc = -ENOMEM); + + memcpy(name, ent->lde_name, ent->lde_namelen); + rc = mdo_name_remove(env, md_object_next(mo), + name, is_dir); + OBD_FREE(name, ent->lde_namelen + 1); + if (rc) + GOTO(cleanup, rc); + + /* + * This ent will be transferred to slave MDS and insert it there, so in + * the slave MDS, we should know whether this object is dir or not, so + * use the highest bit of the hash to indicate that (because we do not + * use highest bit of hash). + */ + if (is_dir) + ent->lde_hash |= MAX_HASH_HIGHEST_BIT; +cleanup: + cmm_object_put(env, obj); + + RETURN(rc); +} + static int cmm_remove_entries(const struct lu_env *env, struct md_object *mo, struct lu_rdpg *rdpg, __u32 hash_end, __u32 *len) @@ -261,26 +364,13 @@ static int cmm_remove_entries(const struct lu_env *env, kmap(rdpg->rp_pages[0]); dp = page_address(rdpg->rp_pages[0]); for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { + ent = lu_dirent_next(ent)) { if (ent->lde_hash < hash_end) { - if (strncmp(ent->lde_name, ".", ent->lde_namelen) && - strncmp(ent->lde_name, "..", ent->lde_namelen)) { - char *name; - /* FIXME: Here we allocate name for each name, - * maybe stupid, but can not find better way. - * will find better way */ - OBD_ALLOC(name, ent->lde_namelen + 1); - memcpy(name, ent->lde_name, ent->lde_namelen); - rc = mdo_name_remove(env, md_object_next(mo), - name, 0); - OBD_FREE(name, ent->lde_namelen + 1); - } + rc = cmm_remove_dir_ent(env, mo, ent); if (rc) { - /* FIXME: Do not know why it return -ENOENT - * in some case - * */ - if (rc != -ENOENT) - GOTO(unmap, rc); + CERROR("Can not del %s rc %d\n", ent->lde_name, + rc); + GOTO(unmap, rc); } } else { if (ent != lu_dirent_start(dp)) @@ -291,9 +381,10 @@ static int cmm_remove_entries(const struct lu_env *env, } } *len = CFS_PAGE_SIZE; + EXIT; unmap: kunmap(rdpg->rp_pages[0]); - RETURN(rc); + return rc; } static int cmm_split_entries(const struct lu_env *env, @@ -304,8 +395,9 @@ static int cmm_split_entries(const struct lu_env *env, ENTRY; LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time" - "npages %d \n", rdpg->rp_npages); - /* Read splitted page and send them to the slave master */ + "npages %d\n", rdpg->rp_npages); + + /* Read split page and send them to the slave master. */ do { struct lu_dirpage *ldp; __u32 len = 0; @@ -315,14 +407,8 @@ static int cmm_split_entries(const struct lu_env *env, kunmap(rdpg->rp_pages[0]); rc = mo_readpage(env, md_object_next(mo), rdpg); - /* -E2BIG means it already reach the end of the dir */ - if (rc) { - if (rc != -ERANGE) { - if (rc == -E2BIG) - rc = 0; - RETURN(rc); - } - } + if (rc) + RETURN(rc); /* Remove the old entries */ rc = cmm_remove_entries(env, mo, rdpg, end, &len); @@ -347,13 +433,16 @@ static int cmm_split_entries(const struct lu_env *env, RETURN(rc); } + #define SPLIT_PAGE_COUNT 1 + static int cmm_scan_and_split(const struct lu_env *env, - struct md_object *mo, struct md_attr *ma) + struct md_object *mo, + struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); + struct lu_rdpg *rdpg = NULL; __u32 hash_segement; - struct lu_rdpg *rdpg = NULL; int rc = 0, i; OBD_ALLOC_PTR(rdpg); @@ -363,7 +452,7 @@ static int cmm_scan_and_split(const struct lu_env *env, rdpg->rp_npages = SPLIT_PAGE_COUNT; rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages; - OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); + OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0])); if (rdpg->rp_pages == NULL) GOTO(free_rdpg, rc = -ENOMEM); @@ -375,91 +464,80 @@ static int cmm_scan_and_split(const struct lu_env *env, hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1); for (i = 1; i < cmm->cmm_tgt_count + 1; i++) { - struct lu_fid *lf = &ma->ma_lmv->mea_ids[i]; + struct lu_fid *lf; __u32 hash_end; + lf = &ma->ma_lmv->mea_ids[i]; + rdpg->rp_hash = i * hash_segement; hash_end = rdpg->rp_hash + hash_segement; rc = cmm_split_entries(env, mo, rdpg, lf, hash_end); if (rc) GOTO(cleanup, rc); } + EXIT; cleanup: for (i = 0; i < rdpg->rp_npages; i++) if (rdpg->rp_pages[i] != NULL) __free_pages(rdpg->rp_pages[i], 0); if (rdpg->rp_pages) OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * - sizeof rdpg->rp_pages[0]); + sizeof rdpg->rp_pages[0]); free_rdpg: if (rdpg) OBD_FREE_PTR(rdpg); - RETURN(rc); -} - -static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, - ssize_t len) -{ - struct lu_buf *buf; - - buf = &cmm_env_info(env)->cmi_buf; - buf->lb_buf = area; - buf->lb_len = len; - return buf; + return rc; } -int cml_try_to_split(const struct lu_env *env, struct md_object *mo) +int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct md_attr *ma; + struct md_attr *ma = &cmm_env_info(env)->cmi_ma; struct lu_buf *buf; int rc = 0; ENTRY; LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); + memset(ma, 0, sizeof(*ma)); - OBD_ALLOC_PTR(ma); - if (ma == NULL) - RETURN(-ENOMEM); - - ma->ma_need = MA_INODE|MA_LMV; - rc = mo_attr_get(env, mo, ma); - if (rc) - GOTO(cleanup, ma); - - /* step1: checking whether the dir need to be splitted */ + /* Step1: Checking whether the dir needs to be split. */ rc = cmm_expect_splitting(env, mo, ma); if (rc != CMM_EXPECT_SPLIT) GOTO(cleanup, rc = 0); - /* Disable trans for splitting, since there will be - * so many trans in this one ops, confilct with current - * recovery design */ + /* + * Disable trans for splitting, since there will be so many trans in + * this one ops, confilct with current recovery design. + */ rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS); if (rc) GOTO(cleanup, rc = 0); - /* step2: create slave objects */ - rc = cmm_create_slave_objects(env, mo, ma); + /* Step2: Create slave objects (on slave MDTs) */ + rc = cmm_slaves_create(env, mo, ma); if (rc) GOTO(cleanup, ma); - /* step3: scan and split the object */ + /* Step3: Scan and split the object. */ rc = cmm_scan_and_split(env, mo, ma); if (rc) GOTO(cleanup, ma); buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); - /* step4: set mea to the master object */ - rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0); - if (rc == -ERESTART) - CWARN("Dir"DFID" has been split \n", - PFID(lu_object_fid(&mo->mo_lu))); + + /* Step4: Set mea to the master object. */ + rc = mo_xattr_set(env, md_object_next(mo), buf, + MDS_LMV_MD_NAME, 0); + if (rc == -ERESTART) { + CWARN("Dir "DFID" has been split\n", + PFID(lu_object_fid(&mo->mo_lu))); + } + EXIT; cleanup: if (ma->ma_lmv_size && ma->ma_lmv) OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); - - OBD_FREE_PTR(ma); - RETURN(rc); + + return rc; } +