X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fmds_lmv.c;h=9dca6e354fbb947dc6c74eb0d72901e0cb9a7be2;hp=8afb801e5f3a220de606a61f879a756c7291a358;hb=e44e9b278432a1df83482b1cd83b2081fabe94dc;hpb=090c677210ee2946d99c71412e4ff762bb300f4f diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index 8afb801..9dca6e3 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -99,6 +99,10 @@ int mds_lmv_connect(struct obd_device *obd, char * lmv_name) GOTO(err_reg, rc); mds->mds_num = mdsize; + rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"), + "inter_mds", 0, NULL); + if (rc) + GOTO(err_reg, rc); RETURN(0); err_reg: @@ -117,7 +121,8 @@ int mds_lmv_postsetup(struct obd_device *obd) struct mds_obd *mds = &obd->u.mds; ENTRY; if (mds->mds_lmv_exp) - obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0); + obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, + mds->mds_max_cookiesize); RETURN(0); } @@ -157,7 +162,7 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode, /* first calculate mea size */ *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp, - (struct lov_mds_md **) mea); + (struct lov_mds_md **)mea); /* FIXME: error handling here */ LASSERT(*mea != NULL); @@ -168,7 +173,6 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode, if (rc <= 0) { OBD_FREE(*mea, *mea_size); *mea = NULL; - *mea_size = 0; } if (rc > 0) rc = 0; @@ -226,6 +230,7 @@ static int dc_new_page_to_cache(struct dir_cache * dirc) static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf) { + struct mds_obd *mds = &dc->obd->u.mds; struct dir_entry *de; struct dentry *dentry; char * end; @@ -233,20 +238,29 @@ static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf) end = buf + PAGE_SIZE; de = (struct dir_entry *) buf; while ((char *) de < end && de->namelen) { - LASSERT(de->namelen <= 255); /* lookup an inode */ + LASSERT(de->namelen <= 255); dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen); if (IS_ERR(dentry)) { - CERROR("can't lookup '%*s'/%u in %lu: %d\n", - (int) de->namelen, de->name, - (unsigned) de->namelen, - (unsigned long) dc->dentry->d_inode->i_ino, - (int) PTR_ERR(dentry)); + CERROR("can't lookup %*s: %d\n", de->namelen, + de->name, (int) PTR_ERR(dentry)); + goto next; + } + if (dentry->d_inode != NULL) { + de->mds = mds->mds_num; + de->ino = dentry->d_inode->i_ino; + de->generation = dentry->d_inode->i_generation; + } else if (dentry->d_flags & DCACHE_CROSS_REF) { + de->mds = dentry->d_mdsnum; + de->ino = dentry->d_inum; + de->generation = dentry->d_generation; + } else { + CERROR("can't lookup %*s\n", de->namelen, de->name); + goto next; } - LASSERT(!IS_ERR(dentry)); - LASSERT(dentry->d_inode != NULL); - de->generation = dentry->d_inode->i_generation; l_dput(dentry); + +next: de = (struct dir_entry *) ((char *) de + DIR_REC_LEN(de->namelen)); } @@ -357,8 +371,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, OBD_ALLOC(file_name, nlen); if (!file_name) RETURN(-ENOMEM); - i = sprintf(file_name, "__iopen__/%u", - (unsigned) dentry->d_inode->i_ino); + i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino); file = filp_open(file_name, O_RDONLY, 0); if (IS_ERR(file)) { @@ -396,7 +409,47 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, return 0; } -#define MAX_DIR_SIZE (32 * 1024) +#define MAX_DIR_SIZE (64 * 1024) + +int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry) +{ + struct mds_obd *mds = &obd->u.mds; + struct mea *mea = NULL; + int rc, size; + + /* clustered MD ? */ + if (!mds->mds_lmv_obd) + RETURN(0); + + /* inode exist? */ + if (dentry->d_inode == NULL) + return 0; + + /* a dir can be splitted only */ + if (!S_ISDIR(dentry->d_inode->i_mode)) + return 0; + + /* large enough to be splitted? */ + if (dentry->d_inode->i_size < MAX_DIR_SIZE) + return 0; + + /* don't split root directory */ + if (dentry->d_inode->i_ino == mds->mds_rootfid.id) + return 0; + + mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size); + if (mea) { + /* already splitted or slave object: shouldn't be splitted */ + rc = 0; + } else { + /* may be splitted */ + rc = 1; + } + + if (mea) + OBD_FREE(mea, size); + RETURN(rc); +} /* * must not be called on already splitted directories @@ -404,64 +457,36 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry, struct mea **mea, int nstripes) { - ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}}; - struct ldlm_res_id res_id = { .name = {0} }; struct inode *dir = dentry->d_inode; struct mds_obd *mds = &obd->u.mds; - struct lustre_handle lockh; struct mea *tmea = NULL; struct obdo *oa = NULL; - int rc, flags = 0; - int mea_size = 0; + int rc, mea_size = 0; void *handle; ENTRY; - /* clustered MD ? */ - if (!mds->mds_lmv_obd) - RETURN(0); - - /* don't split root directory */ - if (dentry->d_inode->i_ino == mds->mds_rootfid.id) - RETURN(0); - -#if 0 - if (dir->i_size < MAX_DIR_SIZE) - RETURN(0); -#endif - - /* check is directory marked non-splittable */ - if (mea && *mea) + /* TODO: optimization possible - we already may have mea here */ + if (!mds_splitting_expected(obd, dentry)) RETURN(0); + LASSERT(mea == NULL || *mea == NULL); - CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n", - obd->obd_name, dir->i_ino, - (unsigned long) dir->i_generation, mea); + CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n", + obd->obd_name, mds->mds_num, dir->i_ino, + (unsigned long) dir->i_generation); if (mea == NULL) mea = &tmea; mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL); /* FIXME: Actually we may only want to allocate enough space for - necessary amount of stripes, but on the other hand with this approach - of allocating maximal possible amount of MDS slots, it would be - easier to split the dir over more MDSes */ - rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea); + * necessary amount of stripes, but on the other hand with this + * approach of allocating maximal possible amount of MDS slots, + * it would be easier to split the dir over more MDSes */ + rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea); if (!(*mea)) RETURN(-ENOMEM); (*mea)->mea_count = nstripes; - - /* convert lock on the dir in order tox - * invalidate client's attributes -bzzz */ - res_id.name[0] = dir->i_ino; - res_id.name[1] = dir->i_generation; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, - LDLM_IBITS, &policy, LCK_PW, &flags, - mds_blocking_ast, ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, &lockh); - if (rc != ELDLM_OK) { - CERROR("error: rc = %d\n", rc); - } - + /* 1) create directory objects on slave MDS'es */ /* FIXME: should this be OBD method? */ oa = obdo_alloc(); @@ -473,7 +498,7 @@ int mds_try_to_split_dir(struct obd_device *obd, OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLUID | OBD_MD_FLGID); oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; - oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP; + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP; oa->o_mode = dir->i_mode; CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n", obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid); @@ -491,13 +516,11 @@ int mds_try_to_split_dir(struct obd_device *obd, LASSERT(!IS_ERR(handle)); rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size); LASSERT(rc == 0); - fsfilt_commit(obd, dir, handle, 0); + fsfilt_commit(obd, mds->mds_sb, dir, handle, 0); LASSERT(rc == 0); up(&dir->i_sem); obdo_free(oa); - ldlm_lock_decref(&lockh, LCK_PW); - /* 3) read through the dir and distribute it over objects */ scan_and_distribute(obd, dentry, *mea); @@ -525,7 +548,6 @@ static int filter_start_page_write(struct inode *inode, struct dentry *filter_fid2dentry(struct obd_device *obd, struct dentry *dir_dentry, obd_gr group, obd_id id); -void f_dput(struct dentry *dentry); int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, @@ -554,7 +576,7 @@ int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, if (dentry->d_inode == NULL) { CERROR("trying to BRW to non-existent file "LPU64"\n", obj->ioo_id); - f_dput(dentry); + l_dput(dentry); GOTO(cleanup, rc = -ENOENT); } @@ -578,7 +600,7 @@ int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, i, obj->ioo_bufcnt, dentry, rc); while (lnb-- > res) __free_pages(lnb->page, 0); - f_dput(dentry); + l_dput(dentry); GOTO(cleanup, rc); } tot_bytes += lnb->len; @@ -636,8 +658,23 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) __free_page(lnb->page); - f_dput(res->dentry); + l_dput(res->dentry); RETURN(rc); } +int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags) +{ + struct lmv_obd *lmv; + struct mds_obd *mds = &obd->u.mds; + int i = mds->mds_num; + + if (flags & REC_REINT_CREATE) { + i = mds->mds_num; + } else if (mds->mds_lmv_exp) { + lmv = &mds->mds_lmv_exp->exp_obd->u.lmv; + i = raw_name2idx(lmv->desc.ld_tgt_count, name, len); + } + RETURN(i); +} +