X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fmds_lmv.c;h=9dca6e354fbb947dc6c74eb0d72901e0cb9a7be2;hp=9ea4849556242f74a6c70cd5046fe3ad2dc41baf;hb=e44e9b278432a1df83482b1cd83b2081fabe94dc;hpb=cc88d5e20dfb1aef12b1dfe3aa69936a924e1525 diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index 9ea4849..9dca6e3 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -99,6 +99,10 @@ int mds_lmv_connect(struct obd_device *obd, char * lmv_name) GOTO(err_reg, rc); mds->mds_num = mdsize; + rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"), + "inter_mds", 0, NULL); + if (rc) + GOTO(err_reg, rc); RETURN(0); err_reg: @@ -158,7 +162,7 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode, /* first calculate mea size */ *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp, - (struct lov_mds_md **) mea); + (struct lov_mds_md **)mea); /* FIXME: error handling here */ LASSERT(*mea != NULL); @@ -407,6 +411,46 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, #define MAX_DIR_SIZE (64 * 1024) +int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry) +{ + struct mds_obd *mds = &obd->u.mds; + struct mea *mea = NULL; + int rc, size; + + /* clustered MD ? */ + if (!mds->mds_lmv_obd) + RETURN(0); + + /* inode exist? */ + if (dentry->d_inode == NULL) + return 0; + + /* a dir can be splitted only */ + if (!S_ISDIR(dentry->d_inode->i_mode)) + return 0; + + /* large enough to be splitted? */ + if (dentry->d_inode->i_size < MAX_DIR_SIZE) + return 0; + + /* don't split root directory */ + if (dentry->d_inode->i_ino == mds->mds_rootfid.id) + return 0; + + mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size); + if (mea) { + /* already splitted or slave object: shouldn't be splitted */ + rc = 0; + } else { + /* may be splitted */ + rc = 1; + } + + if (mea) + OBD_FREE(mea, size); + RETURN(rc); +} + /* * must not be called on already splitted directories */ @@ -421,22 +465,10 @@ int mds_try_to_split_dir(struct obd_device *obd, void *handle; ENTRY; - /* clustered MD ? */ - if (!mds->mds_lmv_obd) - RETURN(0); - - /* don't split root directory */ - if (dentry->d_inode->i_ino == mds->mds_rootfid.id) - RETURN(0); - - /* we want to split only large dirs. this may be already - * splitted dir or a slave dir created during splitting */ - if (dir->i_size < MAX_DIR_SIZE) - RETURN(0); - - /* check is directory marked non-splittable */ - if (mea && *mea) + /* TODO: optimization possible - we already may have mea here */ + if (!mds_splitting_expected(obd, dentry)) RETURN(0); + LASSERT(mea == NULL || *mea == NULL); CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n", obd->obd_name, mds->mds_num, dir->i_ino, @@ -455,8 +487,6 @@ int mds_try_to_split_dir(struct obd_device *obd, RETURN(-ENOMEM); (*mea)->mea_count = nstripes; -#warning "we have to take EX lock on a dir for splitting" - /* 1) create directory objects on slave MDS'es */ /* FIXME: should this be OBD method? */ oa = obdo_alloc(); @@ -486,7 +516,7 @@ int mds_try_to_split_dir(struct obd_device *obd, LASSERT(!IS_ERR(handle)); rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size); LASSERT(rc == 0); - fsfilt_commit(obd, dir, handle, 0); + fsfilt_commit(obd, mds->mds_sb, dir, handle, 0); LASSERT(rc == 0); up(&dir->i_sem); obdo_free(oa); @@ -518,7 +548,6 @@ static int filter_start_page_write(struct inode *inode, struct dentry *filter_fid2dentry(struct obd_device *obd, struct dentry *dir_dentry, obd_gr group, obd_id id); -void f_dput(struct dentry *dentry); int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, @@ -547,7 +576,7 @@ int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, if (dentry->d_inode == NULL) { CERROR("trying to BRW to non-existent file "LPU64"\n", obj->ioo_id); - f_dput(dentry); + l_dput(dentry); GOTO(cleanup, rc = -ENOENT); } @@ -571,7 +600,7 @@ int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, i, obj->ioo_bufcnt, dentry, rc); while (lnb-- > res) __free_pages(lnb->page, 0); - f_dput(dentry); + l_dput(dentry); GOTO(cleanup, rc); } tot_bytes += lnb->len; @@ -634,13 +663,18 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, RETURN(rc); } -int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len) +int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags) { + struct lmv_obd *lmv; struct mds_obd *mds = &obd->u.mds; - struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv; - int i; + int i = mds->mds_num; - i = raw_name2idx(lmv->count, name, len); + if (flags & REC_REINT_CREATE) { + i = mds->mds_num; + } else if (mds->mds_lmv_exp) { + lmv = &mds->mds_lmv_exp->exp_obd->u.lmv; + i = raw_name2idx(lmv->desc.ld_tgt_count, name, len); + } RETURN(i); }