From 9f6e73a6078cc6470d48cf8479c136f1dfdd6e35 Mon Sep 17 00:00:00 2001 From: yury Date: Tue, 22 Aug 2006 13:37:54 +0000 Subject: [PATCH] - more cleanups in LMV. --- lustre/lmv/lmv_fld.c | 3 +- lustre/lmv/lmv_intent.c | 72 ++++++----- lustre/lmv/lmv_internal.h | 16 ++- lustre/lmv/lmv_obd.c | 298 ++++++++++++++++++++++++---------------------- lustre/lmv/lmv_object.c | 10 +- 5 files changed, 214 insertions(+), 185 deletions(-) diff --git a/lustre/lmv/lmv_fld.c b/lustre/lmv/lmv_fld.c index d30c0ec..3a4f034 100644 --- a/lustre/lmv/lmv_fld.c +++ b/lustre/lmv/lmv_fld.c @@ -45,11 +45,10 @@ #include #include "lmv_internal.h" -int lmv_fld_lookup(struct obd_device *obd, +int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid, mdsno_t *mds) { - struct lmv_obd *lmv = &obd->u.lmv; int rc; ENTRY; diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index ceffb09..2165daa 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -64,12 +64,13 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm, struct mdt_body *body = NULL; struct lustre_handle plock; struct md_op_data *op_data; + struct obd_export *tgt_exp; struct lu_fid nid; int pmode, rc = 0; - mdsno_t mds; ENTRY; - body = lustre_msg_buf((*reqp)->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body)); + body = lustre_msg_buf((*reqp)->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body != NULL); if (!(body->valid & OBD_MD_MDS)) @@ -106,12 +107,13 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm, GOTO(out, rc = -ENOMEM); op_data->fid1 = nid; - rc = lmv_fld_lookup(obd, &nid, &mds); - if (rc) - RETURN(rc); - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, - lmm, lmmsize, it, flags, &req, - cb_blocking, extra_lock_flags); + + tgt_exp = lmv_get_export(lmv, &nid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags, + &req, cb_blocking, extra_lock_flags); /* * llite needs LOOKUP lock to track dentry revocation in order to @@ -165,7 +167,7 @@ int lmv_intent_open(struct obd_export *exp, const struct lu_fid *pid, repeat: LASSERT(++loop <= 2); - rc = lmv_fld_lookup(obd, &rpid, &mds); + rc = lmv_fld_lookup(lmv, &rpid, &mds); if (rc) GOTO(out_free_op_data, rc); obj = lmv_obj_grab(obd, &rpid); @@ -298,7 +300,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid, /* caller wants to revalidate attrs of obj we have to revalidate * slaves if requested object is split directory */ CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n", PFID(cid)); - rc = lmv_fld_lookup(obd, cid, &mds); + rc = lmv_fld_lookup(lmv, cid, &mds); if (rc) GOTO(out_free_op_data, rc); #if 0 @@ -308,7 +310,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid, * intent_lock(), but it may change some day */ if (!lu_fid_eq(pid, cid)){ rpid = obj->lo_inodes[mds].li_fid; - rc = lmv_fld_lookup(obd, &rpid, &mds); + rc = lmv_fld_lookup(lmv, &rpid, &mds); if (rc) GOTO(out_free_op_data, rc); } @@ -319,7 +321,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid, } else { CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n", len, name, PFID(pid)); - rc = lmv_fld_lookup(obd, pid, &mds); + rc = lmv_fld_lookup(lmv, pid, &mds); if (rc) GOTO(out_free_op_data, rc); obj = lmv_obj_grab(obd, pid); @@ -328,7 +330,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid, mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)name, len); rpid = obj->lo_inodes[mds].li_fid; - rc = lmv_fld_lookup(obd, &rpid, &mds); + rc = lmv_fld_lookup(lmv, &rpid, &mds); if (rc) GOTO(out_free_op_data, rc); lmv_obj_put(obj); @@ -475,8 +477,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) for (i = 0; i < obj->lo_objcount; i++) { struct lu_fid fid = obj->lo_inodes[i].li_fid; struct ptlrpc_request *req = NULL; + struct obd_export *tgt_exp; struct lookup_intent it; - mdsno_t mds; if (lu_fid_eq(&fid, &obj->lo_fid)) /* skip master obj */ @@ -492,11 +494,11 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) op_data->fid1 = fid; op_data->fid2 = fid; - rc = lmv_fld_lookup(obd, &fid, &mds); - if (rc) - GOTO(cleanup, rc); - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, - NULL, 0, &it, 0, &req, + tgt_exp = lmv_get_export(lmv, &fid); + if (IS_ERR(tgt_exp)) + GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + + rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, lmv_blocking_ast, 0); lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle; @@ -516,7 +518,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) lock->l_ast_data = lmv_obj_get(obj); - body2 = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body2)); + body2 = lustre_msg_buf(req->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body2)); LASSERT(body2); obj->lo_inodes[i].li_size = body2->size; @@ -584,7 +587,7 @@ int lmv_intent_lookup(struct obd_export *exp, const struct lu_fid *pid, rpid = obj->lo_inodes[mds].li_fid; lmv_obj_put(obj); } - rc = lmv_fld_lookup(obd, &rpid, &mds); + rc = lmv_fld_lookup(lmv, &rpid, &mds); if (rc) GOTO(out_free_op_data, rc); @@ -593,7 +596,7 @@ int lmv_intent_lookup(struct obd_export *exp, const struct lu_fid *pid, op_data->fid2 = *cid; } else { - rc = lmv_fld_lookup(obd, pid, &mds); + rc = lmv_fld_lookup(lmv, pid, &mds); if (rc) GOTO(out_free_op_data, rc); repeat: @@ -611,7 +614,7 @@ repeat: mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, (char *)name, len); rpid = obj->lo_inodes[mds].li_fid; - rc = lmv_fld_lookup(obd, &rpid, &mds); + rc = lmv_fld_lookup(lmv, &rpid, &mds); if (rc) GOTO(out_free_op_data, rc); } @@ -699,6 +702,7 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; const char *name = op_data->name; int len = op_data->namelen; struct lu_fid *pid, *cid; @@ -711,9 +715,10 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data, pid = fid_is_sane(&op_data->fid1) ? &op_data->fid1 : NULL; cid = fid_is_sane(&op_data->fid2) ? &op_data->fid2 : NULL; - rc = lmv_fld_lookup(obd, pid, &mds); + rc = lmv_fld_lookup(lmv, pid, &mds); if (rc) RETURN(rc); + CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID" -> #"LPU64"\n", LL_IT2STR(it), len, name, PFID(pid), mds); @@ -747,6 +752,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, struct ptlrpc_request *mreq = *reqp; struct lmv_obd *lmv = &obd->u.lmv; struct lustre_handle master_lockh; + struct obd_export *tgt_exp; struct md_op_data *op_data; struct ldlm_lock *lock; unsigned long size = 0; @@ -754,7 +760,6 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, struct lmv_obj *obj; int master_lock_mode; int i, rc = 0; - mdsno_t mds; ENTRY; OBD_ALLOC_PTR(op_data); @@ -816,11 +821,13 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, op_data->fid2 = fid; /* is obj valid? */ - rc = lmv_fld_lookup(obd, &fid, &mds); - if (rc) - GOTO(out_free_op_data, rc); - rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, - NULL, 0, &it, 0, &req, cb, extra_lock_flags); + tgt_exp = lmv_get_export(lmv, &fid); + if (IS_ERR(tgt_exp)) + GOTO(out_free_op_data, rc = PTR_ERR(tgt_exp)); + + rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb, + extra_lock_flags); + lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle; if (rc > 0 && req == NULL) { /* nice, this slave is valid */ @@ -857,7 +864,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, } - body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body)); + body = lustre_msg_buf(req->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); LASSERT(body); update: @@ -896,7 +904,7 @@ release_lock: body->valid = OBD_MD_FLSIZE; #if 0 - rc = lmv_fld_lookup(obd, &obj->lo_fid, &body->mds); + rc = lmv_fld_lookup(lmv, &obj->lo_fid, &body->mds); if (rc) GOTO(cleanup, rc); #endif diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index 992db96..5e9b2b4 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -23,6 +23,7 @@ #define _LMV_INTERNAL_H_ #include +#include #ifndef __KERNEL__ /* XXX: dirty hack, needs to be fixed more clever way. */ @@ -133,7 +134,7 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **, int lmv_handle_split(struct obd_export *, const struct lu_fid *); int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *, int); -int lmv_fld_lookup(struct obd_device *obd, const struct lu_fid *fid, +int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid, mdsno_t *mds); static inline struct lmv_stripe_md * @@ -166,6 +167,19 @@ static inline int lmv_get_easize(struct lmv_obd *lmv) sizeof(struct lu_fid); } +static inline struct obd_export * +lmv_get_export(struct lmv_obd *lmv, const struct lu_fid *fid) +{ + mdsno_t mds; + int rc; + + rc = lmv_fld_lookup(lmv, fid, &mds); + if (rc) + return ERR_PTR(rc); + + return lmv->tgts[mds].ltd_exp; +} + /* lproc_lmv.c */ extern struct file_operations lmv_proc_target_fops; diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index b32251d..3d4713b 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -681,7 +681,7 @@ static int lmv_placement_policy(struct obd_device *obd, } else { /* default policy is to use parent MDS */ LASSERT(fid_is_sane(hint->ph_pfid)); - rc = lmv_fld_lookup(obd, hint->ph_pfid, mds); + rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds); } } else { /* sequences among all tgts are not well balanced, allocate new @@ -968,16 +968,14 @@ static int lmv_getstatus(struct obd_export *exp, RETURN(rc); } -static int lmv_getxattr(struct obd_export *exp, - const struct lu_fid *fid, - obd_valid valid, const char *name, - const char *input, int input_size, - int output_size, int flags, +static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid, + obd_valid valid, const char *name, const char *input, + int input_size, int output_size, int flags, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + struct obd_export *tgt_exp; int rc; ENTRY; @@ -985,26 +983,24 @@ static int lmv_getxattr(struct obd_export *exp, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - rc = md_getxattr(lmv->tgts[mds].ltd_exp, fid, valid, name, - input, input_size, output_size, flags, request); + rc = md_getxattr(tgt_exp, fid, valid, name, input, input_size, + output_size, flags, request); RETURN(rc); } -static int lmv_setxattr(struct obd_export *exp, - const struct lu_fid *fid, - obd_valid valid, const char *name, - const char *input, int input_size, - int output_size, int flags, +static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid, + obd_valid valid, const char *name, const char *input, + int input_size, int output_size, int flags, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + struct obd_export *tgt_exp; int rc; ENTRY; @@ -1012,25 +1008,24 @@ static int lmv_setxattr(struct obd_export *exp, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - rc = md_setxattr(lmv->tgts[mds].ltd_exp, fid, valid, name, + rc = md_setxattr(tgt_exp, fid, valid, name, input, input_size, output_size, flags, request); RETURN(rc); } -static int lmv_getattr(struct obd_export *exp, - const struct lu_fid *fid, +static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, obd_valid valid, int ea_size, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; struct lmv_obj *obj; - mdsno_t mds; int rc, i; ENTRY; @@ -1038,12 +1033,11 @@ static int lmv_getattr(struct obd_export *exp, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - rc = md_getattr(lmv->tgts[mds].ltd_exp, fid, valid, - ea_size, request); + rc = md_getattr(tgt_exp, fid, valid, ea_size, request); if (rc) RETURN(rc); @@ -1124,7 +1118,7 @@ static int lmv_close(struct obd_export *exp, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + struct obd_export *tgt_exp; int rc; ENTRY; @@ -1132,12 +1126,12 @@ static int lmv_close(struct obd_export *exp, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->fid1)); - rc = md_close(lmv->tgts[mds].ltd_exp, op_data, och, request); + rc = md_close(tgt_exp, op_data, och, request); RETURN(rc); } @@ -1148,10 +1142,10 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ptlrpc_request *req = NULL; + struct obd_export *tgt_exp; struct lmv_obj *obj; struct lustre_md md; int mealen, rc; - mdsno_t mds; __u64 valid; ENTRY; @@ -1160,20 +1154,18 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid) valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); /* time to update mea of parent fid */ - rc = md_getattr(lmv->tgts[mds].ltd_exp, fid, valid, - mealen, &req); + rc = md_getattr(tgt_exp, fid, valid, mealen, &req); if (rc) { CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, rc); } - rc = md_get_lustre_md(lmv->tgts[mds].ltd_exp, req, 0, - NULL, &md); + rc = md_get_lustre_md(tgt_exp, req, 0, NULL, &md); if (rc) { CERROR("mdc_get_lustre_md() failed, error %d\n", rc); GOTO(cleanup, rc); @@ -1204,10 +1196,10 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; struct mdt_body *body; struct lmv_obj *obj; int rc, loop = 0; - mdsno_t mds; ENTRY; rc = lmv_check_connect(obd); @@ -1220,6 +1212,8 @@ repeat: LASSERT(++loop <= 2); obj = lmv_obj_grab(obd, &op_data->fid1); if (obj) { + mdsno_t mds; + mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, op_data->name, op_data->namelen); op_data->fid1 = obj->lo_inodes[mds].li_fid; @@ -1229,12 +1223,12 @@ repeat: CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen, op_data->name, PFID(&op_data->fid1)); - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - rc = md_create(lmv->tgts[mds].ltd_exp, op_data, data, datalen, - mode, uid, gid, cap_effective, rdev, request); + rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid, + cap_effective, rdev, request); if (rc == 0) { if (*request == NULL) RETURN(rc); @@ -1262,7 +1256,7 @@ static int lmv_done_writing(struct obd_export *exp, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + struct obd_export *tgt_exp; int rc; ENTRY; @@ -1270,10 +1264,11 @@ static int lmv_done_writing(struct obd_export *exp, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) - RETURN(rc); - rc = md_done_writing(lmv->tgts[mds].ltd_exp, op_data); + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_done_writing(tgt_exp, op_data); RETURN(rc); } @@ -1288,8 +1283,8 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype, struct lmv_obd *lmv = &obd->u.lmv; struct lmv_stripe_md *mea = op_data->mea1; struct md_op_data *op_data2; + struct obd_export *tgt_exp; int i, rc = 0; - mdsno_t mds; ENTRY; OBD_ALLOC_PTR(op_data2); @@ -1300,21 +1295,24 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype, for (i = 0; i < mea->mea_count; i++) { memset(op_data2, 0, sizeof(*op_data2)); op_data2->fid1 = mea->mea_ids[i]; - rc = lmv_fld_lookup(obd, &op_data2->fid1, &mds); - if (rc) - GOTO(cleanup, rc); - if (lmv->tgts[mds].ltd_exp == NULL) + tgt_exp = lmv_get_export(lmv, &op_data2->fid1); + if (IS_ERR(tgt_exp)) + GOTO(cleanup, rc = PTR_ERR(tgt_exp)); + + if (tgt_exp == NULL) continue; - rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, - lockmode, op_data2, lockh + i, lmm, lmmsize, - cb_compl, cb_blocking, cb_data, 0); + rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2, + lockh + i, lmm, lmmsize, cb_compl, cb_blocking, + cb_data, 0); CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n", PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status); + if (rc) GOTO(cleanup, rc); + if (it->d.lustre.it_data) { struct ptlrpc_request *req; req = (struct ptlrpc_request *)it->d.lustre.it_data; @@ -1353,9 +1351,9 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type, struct lmv_obd *lmv = &obd->u.lmv; struct mdt_body *body = NULL; struct lustre_handle plock; + struct obd_export *tgt_exp; struct md_op_data *rdata; int rc = 0, pmode; - mdsno_t mds; ENTRY; body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body)); @@ -1384,13 +1382,13 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type, it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE; ptlrpc_req_finished(req); - rc = lmv_fld_lookup(obd, &rdata->fid1, &mds); - if (rc) - GOTO(out_free_rdata, rc); - rc = md_enqueue(lmv->tgts[mds].ltd_exp, - lock_type, it, lock_mode, rdata, lockh, lmm, - lmmsize, cb_compl, cb_blocking, cb_data, - extra_lock_flags); + tgt_exp = lmv_get_export(lmv, &rdata->fid1); + if (IS_ERR(tgt_exp)) + GOTO(out_free_rdata, rc = PTR_ERR(tgt_exp)); + + rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata, + lockh, lmm, lmmsize, cb_compl, cb_blocking, + cb_data, extra_lock_flags); ldlm_lock_decref(&plock, pmode); EXIT; @@ -1409,8 +1407,8 @@ lmv_enqueue(struct obd_export *exp, int lock_type, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; struct lmv_obj *obj; - mdsno_t mds; int rc; ENTRY; @@ -1428,6 +1426,8 @@ lmv_enqueue(struct obd_export *exp, int lock_type, if (op_data->namelen) { obj = lmv_obj_grab(obd, &op_data->fid1); if (obj) { + mdsno_t mds; + /* directory is split. look for right mds for this * name */ mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, @@ -1439,13 +1439,14 @@ lmv_enqueue(struct obd_export *exp, int lock_type, CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it), PFID(&op_data->fid1)); - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) - RETURN(rc); - rc = md_enqueue(lmv->tgts[mds].ltd_exp, - lock_type, it, lock_mode, op_data, lockh, lmm, - lmmsize, cb_compl, cb_blocking, cb_data, + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh, + lmm, lmmsize, cb_compl, cb_blocking, cb_data, extra_lock_flags); + if (rc == 0 && it->it_op == IT_OPEN) rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode, op_data, lockh, lmm, lmmsize, @@ -1461,10 +1462,11 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; struct lu_fid rid = *fid; - int rc, loop = 0; struct mdt_body *body; struct lmv_obj *obj; + int rc, loop = 0; mdsno_t mds; ENTRY; @@ -1472,7 +1474,7 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, fid, &mds); + rc = lmv_fld_lookup(lmv, fid, &mds); if (rc) RETURN(rc); repeat: @@ -1489,31 +1491,32 @@ repeat: CDEBUG(D_OTHER, "getattr_lock for %*s on "DFID" -> "DFID"\n", namelen, filename, PFID(fid), PFID(&rid)); - rc = lmv_fld_lookup(obd, &rid, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, &rid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - rc = md_getattr_name(lmv->tgts[mds].ltd_exp, - &rid, filename, namelen, - valid, ea_size, request); + rc = md_getattr_name(tgt_exp, &rid, filename, namelen, valid, + ea_size, request); if (rc == 0) { - body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF, sizeof(*body)); + body = lustre_msg_buf((*request)->rq_repmsg, + REQ_REC_OFF, sizeof(*body)); LASSERT(body != NULL); if (body->valid & OBD_MD_MDS) { struct ptlrpc_request *req = NULL; rid = body->fid1; - CDEBUG(D_OTHER, "request attrs for "DFID"\n", PFID(&rid)); + CDEBUG(D_OTHER, "request attrs for "DFID"\n", + PFID(&rid)); - rc = lmv_fld_lookup(obd, &rid, &mds); - if (rc) { + tgt_exp = lmv_get_export(lmv, &rid); + if (IS_ERR(tgt_exp)) { ptlrpc_req_finished(*request); - RETURN(rc); + RETURN(PTR_ERR(tgt_exp)); } - rc = md_getattr_name(lmv->tgts[mds].ltd_exp, - &rid, NULL, 1, valid, ea_size, &req); + rc = md_getattr_name(tgt_exp, &rid, NULL, 1, valid, + ea_size, &req); ptlrpc_req_finished(*request); *request = req; } @@ -1557,7 +1560,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, lmv_obj_put(obj); } - rc = lmv_fld_lookup(obd, &op_data->fid2, &mds); + rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds); if (rc) RETURN(rc); @@ -1565,7 +1568,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, PFID(&op_data->fid2), op_data->namelen, op_data->name, PFID(&op_data->fid1)); } else { - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); + rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); if (rc) RETURN(rc); @@ -1610,7 +1613,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, "to "DFID"\n", newlen, new, oldlen, newlen, PFID(&op_data->fid2), PFID(&op_data->fid1)); - rc = lmv_fld_lookup(obd, &op_data->fid2, &mds); + rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds); if (rc) RETURN(rc); @@ -1659,13 +1662,12 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, lmv_obj_put(obj); } - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); + rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds); if (rc) RETURN(rc); - request: - rc = lmv_fld_lookup(obd, &op_data->fid2, &mds2); + rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds2); if (rc) RETURN(rc); @@ -1687,10 +1689,10 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ptlrpc_request *req; + struct obd_export *tgt_exp; struct mdt_body *body; struct lmv_obj *obj; int rc = 0, i; - mdsno_t mds; ENTRY; rc = lmv_check_connect(obd); @@ -1706,13 +1708,14 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, for (i = 0; i < obj->lo_objcount; i++) { op_data->fid1 = obj->lo_inodes[i].li_fid; - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) { + rc = PTR_ERR(tgt_exp); break; + } - rc = md_setattr(lmv->tgts[mds].ltd_exp, - op_data, iattr, ea, ealen, ea2, - ea2len, &req); + rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen, + ea2, ea2len, &req); if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) { /* @@ -1729,12 +1732,12 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, } lmv_obj_put(obj); } else { - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) - RETURN(rc); + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - rc = md_setattr(lmv->tgts[mds].ltd_exp, op_data, iattr, ea, - ealen, ea2, ea2len, request); + rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen, ea2, + ea2len, request); if (rc == 0) { body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF, sizeof(*body)); @@ -1749,7 +1752,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + struct obd_export *tgt_exp; int rc; ENTRY; @@ -1757,11 +1760,11 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - RETURN(rc); - rc = md_sync(lmv->tgts[mds].ltd_exp, - fid, request); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_sync(tgt_exp, fid, request); RETURN(rc); } @@ -1805,6 +1808,8 @@ int lmv_blocking_ast(struct ldlm_lock *lock, RETURN(0); } +#if 0 +/* not needed for CMD3 because only dir on master has "." and ".." */ static void lmv_remove_dots(struct page *page) { unsigned limit = PAGE_CACHE_SIZE; @@ -1821,6 +1826,7 @@ static void lmv_remove_dots(struct page *page) p->inode = 0; } } +#endif static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, @@ -1829,9 +1835,9 @@ static int lmv_readpage(struct obd_export *exp, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; struct lu_fid rid = *fid; struct lmv_obj *obj; - mdsno_t mds; int i, rc; ENTRY; @@ -1861,24 +1867,25 @@ static int lmv_readpage(struct obd_export *exp, PFID(&rid), (unsigned long)offset); } - rc = lmv_fld_lookup(obd, &rid, &mds); - if (rc) - RETURN(rc); - - rc = md_readpage(lmv->tgts[mds].ltd_exp, &rid, - offset, page, request); + tgt_exp = lmv_get_export(lmv, &rid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); - if (0 && rc == 0 && !lu_fid_eq(&rid, fid)) + rc = md_readpage(tgt_exp, &rid, offset, page, request); + +#if 0 + if (rc == 0 && !lu_fid_eq(&rid, fid)) /* * This page isn't from master object. To avoid "." and ".." * duplication in directory, we have to remove them from all * slave objects * - * XXX this is not needed for cmd3 readdir, because only - * master directory has dot and dotdot. + * XXX this is not needed for cmd3 readdir, because only master + * directory has dot and dotdot. */ lmv_remove_dots(page); - +#endif + RETURN(rc); } @@ -1890,8 +1897,8 @@ static int lmv_unlink_slaves(struct obd_export *exp, struct lmv_obd *lmv = &obd->u.lmv; struct lmv_stripe_md *mea = op_data->mea1; struct md_op_data *op_data2; + struct obd_export *tgt_exp; int i, rc = 0; - mdsno_t mds; ENTRY; OBD_ALLOC_PTR(op_data2); @@ -1904,14 +1911,14 @@ static int lmv_unlink_slaves(struct obd_export *exp, op_data2->fid1 = mea->mea_ids[i]; op_data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR; - rc = lmv_fld_lookup(obd, &op_data2->fid1, &mds); - if (rc) - GOTO(out_free_op_data2, rc); - if (lmv->tgts[mds].ltd_exp == NULL) + tgt_exp = lmv_get_export(lmv, &op_data2->fid1); + if (IS_ERR(tgt_exp)) + GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp)); + + if (tgt_exp == NULL) continue; - rc = md_unlink(lmv->tgts[mds].ltd_exp, - op_data2, req); + rc = md_unlink(tgt_exp, op_data2, req); CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n", PFID(&mea->mea_ids[i]), rc); @@ -1935,7 +1942,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - mdsno_t mds; + struct obd_export *tgt_exp; int rc, i; ENTRY; @@ -1966,10 +1973,11 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n", PFID(&op_data->fid1)); } - rc = lmv_fld_lookup(obd, &op_data->fid1, &mds); - if (rc) - RETURN(rc); - rc = md_unlink(lmv->tgts[mds].ltd_exp, op_data, request); + tgt_exp = lmv_get_export(lmv, &op_data->fid1); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_unlink(tgt_exp, op_data, request); RETURN(rc); } @@ -2216,18 +2224,18 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen, if (keylen == strlen("ids") && memcmp(key, "ids", keylen) == 0) { struct lu_fid *fid = (struct lu_fid *)val; - mdsno_t mds; + struct obd_export *tgt_exp; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - RETURN(rc); - rc = obd_set_info_async(lmv->tgts[mds].ltd_exp, - keylen, key, vallen, val, - set); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = obd_set_info_async(tgt_exp, keylen, key, vallen, + val, set); RETURN(rc); } diff --git a/lustre/lmv/lmv_object.c b/lustre/lmv/lmv_object.c index 274e7b1..93d8709 100644 --- a/lustre/lmv/lmv_object.c +++ b/lustre/lmv/lmv_object.c @@ -286,10 +286,10 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct ptlrpc_request *req = NULL; + struct obd_export *tgt_exp; struct lmv_obj *obj; struct lustre_md md; int mealen, rc; - mdsno_t mds; ENTRY; CDEBUG(D_OTHER, "get mea for "DFID" and create lmv obj\n", @@ -307,11 +307,11 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid, md.mea = NULL; valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA; - rc = lmv_fld_lookup(obd, fid, &mds); - if (rc) - GOTO(cleanup, obj = ERR_PTR(rc)); + tgt_exp = lmv_get_export(lmv, fid); + if (IS_ERR(tgt_exp)) + GOTO(cleanup, obj = (void *)tgt_exp); - rc = md_getattr(lmv->tgts[mds].ltd_exp, fid, valid, mealen, &req); + rc = md_getattr(tgt_exp, fid, valid, mealen, &req); if (rc) { CERROR("md_getattr() failed, error %d\n", rc); GOTO(cleanup, obj = ERR_PTR(rc)); -- 1.8.3.1