From 170929c1653b1dc993f98cd9f250385a7a8cc0fd Mon Sep 17 00:00:00 2001 From: yury Date: Sat, 28 Oct 2006 11:28:05 +0000 Subject: [PATCH] - fixed ma_valid in cmm_mdsnum_check(); - fixes in cmm_try_to_split(); - added error messages to cmm_try_to_split(); - cache mdsnum for lmv slaves so that we need not to lookup them all the time. This reduces number of fld cache lookups from ~46000 to ~12000 on sanity 24o. --- lustre/cmm/cmm_internal.h | 2 +- lustre/cmm/cmm_object.c | 15 ++---- lustre/cmm/cmm_split.c | 34 +++++++++---- lustre/fld/fld_cache.c | 4 +- lustre/lmv/lmv_intent.c | 55 +++++++++----------- lustre/lmv/lmv_internal.h | 11 +++- lustre/lmv/lmv_obd.c | 124 ++++++++++++++++++++++++---------------------- lustre/lmv/lmv_object.c | 13 ++++- lustre/mdd/mdd_lov.c | 2 +- lustre/mdd/mdd_object.c | 3 +- 10 files changed, 145 insertions(+), 118 deletions(-) diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index def7915..41f2e94 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -45,7 +45,7 @@ enum { }; struct cmm_device { - struct md_device cmm_md_dev; + struct md_device cmm_md_dev; /* device flags, taken from enum cmm_flags */ __u32 cmm_flags; /* underlaying device in MDS stack, usually MDD */ diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 793e078..04edafc 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -379,20 +379,18 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env, struct md_attr *ma = &cmm_env_info(env)->cmi_ma; int rc, split; ENTRY; - - memset(ma, 0, sizeof(*ma)); /* - * Check only if we need protection from split. If not - mdt - * handles other cases. + * Check only if we need protection from split. If not - mdt handles + * other cases. */ - rc = cmm_expect_splitting(env, mo, ma, &split); + rc = cmm_expect_splitting(env, mo, ma, &split); if (rc) { CERROR("Can't check for possible split, error %d\n", rc); RETURN(MDL_MINMODE); } - + /* * Do not take PDO lock on non-splittable objects if this is not PW, * this should speed things up a bit. @@ -401,11 +399,8 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env, RETURN(MDL_NL); /* Protect splitting by exclusive lock. */ - if (split == CMM_EXPECT_SPLIT && lm == MDL_PW) { - CDEBUG(D_INFO|D_WARNING, "Going to split "DFID"\n", - PFID(lu_object_fid(&mo->mo_lu))); + if (split == CMM_EXPECT_SPLIT && lm == MDL_PW) RETURN(MDL_EX); - } /* Have no idea about lock mode, let it be what higher layer wants. */ RETURN(MDL_MINMODE); diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 5fec095..baf7ba0 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -72,6 +72,13 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, if (ma->ma_valid & MA_LMV) { int stripe; + /* + * Clean MA_LMV in ->ma_valid because mdd will do nothing + * counting that EA is already taken. + */ + ma->ma_valid &= ~MA_LMV; + + LASSERT(ma->ma_lmv_size > 0); OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size); if (ma->ma_lmv == NULL) RETURN(-ENOMEM); @@ -125,6 +132,7 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, } /* MA_INODE is needed to check inode size. */ + memset(ma, 0, sizeof(*ma)); ma->ma_need = MA_INODE | MA_LMV; rc = mo_attr_get(env, mo, ma); if (rc) @@ -508,15 +516,14 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) ENTRY; LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); - memset(ma, 0, sizeof(*ma)); /* Step1: Checking whether the dir needs to be split. */ rc = cmm_expect_splitting(env, mo, ma, &split); if (rc) - GOTO(cleanup, rc); + RETURN(rc); if (split != CMM_EXPECT_SPLIT) - GOTO(cleanup, rc = 0); + RETURN(0); LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if " "dir is protected by MDL_EX lock. Lock mode 0x%x\n", @@ -527,18 +534,24 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) * this one ops, confilct with current recovery design. */ rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS); - if (rc) - GOTO(cleanup, rc = 0); + if (rc) { + CERROR("Can't disable trans for split, rc %d\n", rc); + RETURN(rc); + } /* Step2: Create slave objects (on slave MDTs) */ rc = cmm_slaves_create(env, mo, ma); - if (rc) - GOTO(cleanup, ma); + if (rc) { + CERROR("Can't create slaves for split, rc %d\n", rc); + GOTO(cleanup, rc); + } /* Step3: Scan and split the object. */ rc = cmm_scan_and_split(env, mo, ma); - if (rc) - GOTO(cleanup, ma); + if (rc) { + CERROR("Can't scan and split, rc %d\n", rc); + GOTO(cleanup, rc); + } buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); @@ -549,6 +562,9 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu))); rc = -ERESTART; + } else { + CERROR("Can't set MEA to master dir, " + "rc %d\n", rc); } EXIT; cleanup: diff --git a/lustre/fld/fld_cache.c b/lustre/fld/fld_cache.c index 86ea531..26a68a7 100644 --- a/lustre/fld/fld_cache.c +++ b/lustre/fld/fld_cache.c @@ -166,8 +166,8 @@ fld_cache_bucket(struct fld_cache *cache, seqno_t seq) /* * Check if cache needs to be shrinked. If so - do it. Tries to keep all - * collision lists well balanced. That is, checks all of them and removes one - * entry in list and so on. + * collision lists well balanced. That is, check all of them and remove one + * entry in list and so on until cache is shrinked enough. */ static int fld_cache_shrink(struct fld_cache *cache) { diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 16f768f..f771763 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -101,7 +101,7 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm, it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; - tgt_exp = lmv_get_export(lmv, &body->fid1); + tgt_exp = lmv_find_export(lmv, &body->fid1); if (IS_ERR(tgt_exp)) GOTO(out, rc = PTR_ERR(tgt_exp)); @@ -157,10 +157,8 @@ int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid, mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)op->name, op->namelen); rpid = &obj->lo_inodes[mea_idx].li_fid; - rc = lmv_fld_lookup(lmv, rpid, &mds); + mds = obj->lo_inodes[mea_idx].li_mds; lmv_obj_put(obj); - if (rc) - RETURN(rc); rc = __lmv_fid_alloc(lmv, fid, mds); if (rc) { @@ -217,11 +215,12 @@ repeat: op_data->namelen); rpid = obj->lo_inodes[mea_idx].li_fid; + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid)); + } else { + tgt_exp = lmv_find_export(lmv, &rpid); } - - tgt_exp = lmv_get_export(lmv, &rpid); if (IS_ERR(tgt_exp)) GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp)); @@ -267,11 +266,10 @@ repeat: cb_blocking, extra_lock_flags); if (rc != 0) { LASSERT(rc < 0); - /* * This is possible, that some userspace application will try to * open file as directory and we will have -ENOTDIR here. As - * this is "usual" situation, we should not print error here, + * this is normal situation, we should not print error here, * only debug info. */ CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):" @@ -373,11 +371,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, if (obj) { if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){ rpid = obj->lo_inodes[mds].li_fid; - rc = lmv_fld_lookup(lmv, &rpid, &mds); - if (rc) { - lmv_obj_put(obj); - GOTO(out_free_sop_data, rc); - } + mds = obj->lo_inodes[mds].li_mds; } lmv_obj_put(obj); } @@ -398,15 +392,15 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, (char *)op_data->name, op_data->namelen); rpid = obj->lo_inodes[mea_idx].li_fid; - rc = lmv_fld_lookup(lmv, &rpid, &mds); - if (rc) { - lmv_obj_put(obj); - GOTO(out_free_sop_data, rc); - } + mds = obj->lo_inodes[mea_idx].li_mds; lmv_obj_put(obj); CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n", mds, PFID(&rpid)); + } else { + rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); + if (rc) + GOTO(out_free_sop_data, rc); } } @@ -563,7 +557,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) op_data->fid1 = fid; op_data->fid2 = fid; - tgt_exp = lmv_get_export(lmv, &fid); + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); if (IS_ERR(tgt_exp)) GOTO(cleanup, rc = PTR_ERR(tgt_exp)); @@ -662,18 +656,17 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, (char *)op_data->name, op_data->namelen); rpid = obj->lo_inodes[mea_idx].li_fid; + mds = obj->lo_inodes[mea_idx].li_mds; lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &rpid, &mds); + if (rc) + GOTO(out_free_sop_data, rc); } - rc = lmv_fld_lookup(lmv, &rpid, &mds); - if (rc) - GOTO(out_free_sop_data, rc); CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n", PFID(&op_data->fid2), mds); } else { - rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); - if (rc) - GOTO(out_free_sop_data, rc); repeat: LASSERT(++loop <= 2); @@ -691,13 +684,13 @@ repeat: (char *)op_data->name, op_data->namelen); rpid = obj->lo_inodes[mea_idx].li_fid; - rc = lmv_fld_lookup(lmv, &rpid, &mds); - if (rc) { - lmv_obj_put(obj); - GOTO(out_free_sop_data, rc); - } + mds = obj->lo_inodes[mea_idx].li_mds; } lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); + if (rc) + GOTO(out_free_sop_data, rc); } fid_zero(&op_data->fid2); } @@ -897,7 +890,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, op_data->fid2 = fid; /* is obj valid? */ - tgt_exp = lmv_get_export(lmv, &fid); + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); if (IS_ERR(tgt_exp)) GOTO(cleanup, rc = PTR_ERR(tgt_exp)); diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index 4e5b397..cb60b10 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -44,6 +44,7 @@ struct qstr { struct lmv_inode { struct lu_fid li_fid; /* id of dirobj */ + mdsno_t li_mds; /* cached mdsno where @li_fid lives */ unsigned long li_size; /* slave size value */ int li_flags; }; @@ -174,7 +175,13 @@ static inline int lmv_get_easize(struct lmv_obd *lmv) } static inline struct obd_export * -lmv_get_export(struct lmv_obd *lmv, const struct lu_fid *fid) +lmv_get_export(struct lmv_obd *lmv, mdsno_t mds) +{ + return lmv->tgts[mds].ltd_exp; +} + +static inline struct obd_export * +lmv_find_export(struct lmv_obd *lmv, const struct lu_fid *fid) { mdsno_t mds; int rc; @@ -183,7 +190,7 @@ lmv_get_export(struct lmv_obd *lmv, const struct lu_fid *fid) if (rc) return ERR_PTR(rc); - return lmv->tgts[mds].ltd_exp; + return lmv_get_export(lmv, mds); } /* lproc_lmv.c */ diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 3c3df17..e88aa93 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -760,22 +760,22 @@ static int lmv_placement_policy(struct obd_device *obd, */ obj = lmv_obj_grab(obd, hint->ph_pfid); if (obj) { + struct lu_fid *rpid; + int mea_idx; + /* * If the dir got split, alloc fid according to its * hash. No matter what we create, object create should - * go to correct MDS. + * go to correct MDS. */ - struct lu_fid *rpid; - int mea_idx; mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, hint->ph_cname->name, hint->ph_cname->len); rpid = &obj->lo_inodes[mea_idx].li_fid; - rc = lmv_fld_lookup(lmv, rpid, mds); + *mds = obj->lo_inodes[mea_idx].li_mds; lmv_obj_put(obj); - if (rc) - GOTO(exit, rc); + rc = 0; CDEBUG(D_INODE, "The obj "DFID" has been split, got " "MDS at "LPU64" by name %s\n", PFID(hint->ph_pfid), @@ -788,7 +788,7 @@ static int lmv_placement_policy(struct obd_device *obd, } else { /* * Default policy for others is to use parent MDS. - * ONLY directories can be cross-ref during creation + * ONLY directories can be cross-ref during creation. */ rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds); } @@ -1078,7 +1078,7 @@ static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1103,7 +1103,7 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1128,7 +1128,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1219,7 +1219,7 @@ static int lmv_close(struct obd_export *exp, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, &op_data->fid1); + tgt_exp = lmv_find_export(lmv, &op_data->fid1); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1249,7 +1249,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1311,16 +1311,18 @@ repeat: mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, op_data->name, op_data->namelen); op_data->fid1 = obj->lo_inodes[mea_idx].li_fid; + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); + } else { + tgt_exp = lmv_find_export(lmv, &op_data->fid1); } - CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen, - op_data->name, PFID(&op_data->fid1)); - - tgt_exp = lmv_get_export(lmv, &op_data->fid1); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); + CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen, + op_data->name, PFID(&op_data->fid1)); + rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid, cap_effective, rdev, request); if (rc == 0) { @@ -1359,7 +1361,7 @@ static int lmv_done_writing(struct obd_export *exp, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, &op_data->fid1); + tgt_exp = lmv_find_export(lmv, &op_data->fid1); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1391,7 +1393,7 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype, memset(op_data2, 0, sizeof(*op_data2)); op_data2->fid1 = mea->mea_ids[i]; - tgt_exp = lmv_get_export(lmv, &op_data2->fid1); + tgt_exp = lmv_find_export(lmv, &op_data2->fid1); if (IS_ERR(tgt_exp)) GOTO(cleanup, rc = PTR_ERR(tgt_exp)); @@ -1473,7 +1475,7 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type, it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; ptlrpc_req_finished(req); - tgt_exp = lmv_get_export(lmv, &fid_copy); + tgt_exp = lmv_find_export(lmv, &fid_copy); if (IS_ERR(tgt_exp)) GOTO(out, rc = PTR_ERR(tgt_exp)); @@ -1505,7 +1507,7 @@ lmv_enqueue(struct obd_export *exp, int lock_type, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; + struct obd_export *tgt_exp = NULL; struct lmv_obj *obj; int rc; ENTRY; @@ -1533,16 +1535,19 @@ lmv_enqueue(struct obd_export *exp, int lock_type, (char *)op_data->name, op_data->namelen); op_data->fid1 = obj->lo_inodes[mea_idx].li_fid; + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); } } - CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it), - PFID(&op_data->fid1)); - - tgt_exp = lmv_get_export(lmv, &op_data->fid1); + + if (tgt_exp == NULL) + tgt_exp = lmv_find_export(lmv, &op_data->fid1); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); + CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it), + PFID(&op_data->fid1)); + rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh, lmm, lmmsize, cb_compl, cb_blocking, cb_data, extra_lock_flags); @@ -1582,16 +1587,17 @@ repeat: mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, filename, namelen - 1); rid = obj->lo_inodes[mea_idx].li_fid; + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); + } else { + tgt_exp = lmv_find_export(lmv, &rid); } + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n", namelen, filename, PFID(fid), PFID(&rid)); - tgt_exp = lmv_get_export(lmv, &rid); - if (IS_ERR(tgt_exp)) - RETURN(PTR_ERR(tgt_exp)); - rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid, ea_size, request); if (rc == 0) { @@ -1606,7 +1612,7 @@ repeat: CDEBUG(D_OTHER, "request attrs for "DFID"\n", PFID(&rid)); - tgt_exp = lmv_get_export(lmv, &rid); + tgt_exp = lmv_find_export(lmv, &rid); if (IS_ERR(tgt_exp)) { ptlrpc_req_finished(*request); RETURN(PTR_ERR(tgt_exp)); @@ -1657,13 +1663,14 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, op_data->name, op_data->namelen); op_data->fid2 = obj->lo_inodes[mea_idx].li_fid; + mds = obj->lo_inodes[mea_idx].li_mds; lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds); + if (rc) + RETURN(rc); } - rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds); - if (rc) - RETURN(rc); - CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n", PFID(&op_data->fid2), op_data->namelen, op_data->name, PFID(&op_data->fid1)); @@ -1695,8 +1702,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_obj *obj; - mdsno_t mds, mds2; int rc, mea_idx; + mdsno_t mds; ENTRY; CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n", @@ -1747,8 +1754,13 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)old, oldlen); op_data->fid1 = obj->lo_inodes[mea_idx].li_fid; + mds = obj->lo_inodes[mea_idx].li_mds; CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->fid1)); lmv_obj_put(obj); + } else { + rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); + if (rc) + RETURN(rc); } obj = lmv_obj_grab(obd, &op_data->fid2); @@ -1764,21 +1776,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->fid2)); lmv_obj_put(obj); } - - rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); - if (rc) - RETURN(rc); - request: - rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds2); - if (rc) - RETURN(rc); - - if (mds != mds2) { - CDEBUG(D_OTHER,"cross-node rename "DFID"/%*s to "DFID"/%*s\n", - PFID(&op_data->fid1), oldlen, old, - PFID(&op_data->fid2), newlen, new); - } op_data->fsuid = current->fsuid; op_data->fsgid = current->fsgid; op_data->cap = current->cap_effective; @@ -1813,7 +1811,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, for (i = 0; i < obj->lo_objcount; i++) { op_data->fid1 = obj->lo_inodes[i].li_fid; - tgt_exp = lmv_get_export(lmv, &op_data->fid1); + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); if (IS_ERR(tgt_exp)) { rc = PTR_ERR(tgt_exp); break; @@ -1837,7 +1835,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, } lmv_obj_put(obj); } else { - tgt_exp = lmv_get_export(lmv, &op_data->fid1); + tgt_exp = lmv_find_export(lmv, &op_data->fid1); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1860,7 +1858,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -1937,14 +1935,16 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, do_div(index, (__u32)seg); i = (int)index; rid = obj->lo_inodes[i].li_fid; + tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); lmv_obj_unlock(obj); CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n", PFID(&rid), (unsigned long)offset, i); + } else { + tgt_exp = lmv_find_export(lmv, &rid); } - tgt_exp = lmv_get_export(lmv, &rid); if (IS_ERR(tgt_exp)) GOTO(cleanup, rc = PTR_ERR(tgt_exp)); @@ -1970,10 +1970,11 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, * Here we could remove "." and ".." from all pages which at not from * master. But MDS has only "." and ".." for master dir. */ + EXIT; cleanup: if (obj) lmv_obj_put(obj); - RETURN(rc); + return rc; } static int lmv_unlink_slaves(struct obd_export *exp, @@ -1999,7 +2000,7 @@ static int lmv_unlink_slaves(struct obd_export *exp, op_data2->mode = MDS_MODE_DONT_LOCK | S_IFDIR; op_data2->fsuid = current->fsuid; op_data2->fsgid = current->fsgid; - tgt_exp = lmv_get_export(lmv, &op_data2->fid1); + tgt_exp = lmv_find_export(lmv, &op_data2->fid1); if (IS_ERR(tgt_exp)) GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp)); @@ -2030,7 +2031,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct obd_export *tgt_exp; + struct obd_export *tgt_exp = NULL; int rc; ENTRY; @@ -2055,6 +2056,8 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, op_data->name, op_data->namelen); op_data->fid1 = obj->lo_inodes[mea_idx].li_fid; + tgt_exp = lmv_get_export(lmv, + obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n", op_data->namelen, op_data->name, @@ -2064,7 +2067,8 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n", PFID(&op_data->fid1)); } - tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (tgt_exp == NULL) + tgt_exp = lmv_find_export(lmv, &op_data->fid1); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); op_data->fsuid = current->fsuid; @@ -2433,7 +2437,7 @@ int lmv_set_open_replay_data(struct obd_export *exp, ENTRY; - tgt_exp = lmv_get_export(lmv, &och->och_fid); + tgt_exp = lmv_find_export(lmv, &och->och_fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -2448,7 +2452,7 @@ int lmv_clear_open_replay_data(struct obd_export *exp, struct obd_export *tgt_exp; ENTRY; - tgt_exp = lmv_get_export(lmv, &och->och_fid); + tgt_exp = lmv_find_export(lmv, &och->och_fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -2470,7 +2474,7 @@ static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); @@ -2492,7 +2496,7 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, &oc->c_capa.lc_fid); + tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); diff --git a/lustre/lmv/lmv_object.c b/lustre/lmv/lmv_object.c index 4fb9692..03a0f04 100644 --- a/lustre/lmv/lmv_object.c +++ b/lustre/lmv/lmv_object.c @@ -94,10 +94,21 @@ lmv_obj_alloc(struct obd_device *obd, /* put all ids in */ for (i = 0; i < mea->mea_count; i++) { + int rc; + CDEBUG(D_OTHER, "subobj "DFID"\n", PFID(&mea->mea_ids[i])); obj->lo_inodes[i].li_fid = mea->mea_ids[i]; LASSERT(fid_is_sane(&obj->lo_inodes[i].li_fid)); + + /* + * Cache slave mds number to use it in all cases it is needed + * instead of constant lookup. + */ + rc = lmv_fld_lookup(lmv, &obj->lo_inodes[i].li_fid, + &obj->lo_inodes[i].li_mds); + if (rc) + goto err_obj; } return obj; @@ -308,7 +319,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, md.mea = NULL; valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - tgt_exp = lmv_get_export(lmv, fid); + tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) GOTO(cleanup, obj = (void *)tgt_exp); diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 0ab9c6d..e235d74 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -203,7 +203,7 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, } else if (rc < 0) { CERROR("Error %d reading eadata \n", rc); } else { - /* FIXME convert lov EA but fixed after verification test */ + /* XXX: convert lov EA but fixed after verification test. */ *md_size = rc; } diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 3dd8245..974cb09 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -316,6 +316,7 @@ static int __mdd_lmv_get(const struct lu_env *env, if (ma->ma_valid & MA_LMV) RETURN(0); + rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size, MDS_LMV_MD_NAME); if (rc > 0) { @@ -351,7 +352,7 @@ static int mdd_attr_get_internal(const struct lu_env *env, } #endif CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n", - rc, ma->ma_valid); + rc, ma->ma_valid); RETURN(rc); } -- 1.8.3.1