Whamcloud - gitweb
- more cleanups in LMV.
authoryury <yury>
Tue, 22 Aug 2006 13:37:54 +0000 (13:37 +0000)
committeryury <yury>
Tue, 22 Aug 2006 13:37:54 +0000 (13:37 +0000)
lustre/lmv/lmv_fld.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lmv/lmv_object.c

index d30c0ec..3a4f034 100644 (file)
 #include <lprocfs_status.h>
 #include "lmv_internal.h"
 
-int lmv_fld_lookup(struct obd_device *obd,
+int lmv_fld_lookup(struct lmv_obd *lmv,
                    const struct lu_fid *fid,
                    mdsno_t *mds)
 {
-        struct lmv_obd *lmv = &obd->u.lmv;
         int rc;
         ENTRY;
 
index ceffb09..2165daa 100644 (file)
@@ -64,12 +64,13 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
         struct mdt_body *body = NULL;
         struct lustre_handle plock;
         struct md_op_data *op_data;
+        struct obd_export *tgt_exp;
         struct lu_fid nid;
         int pmode, rc = 0;
-        mdsno_t mds;
         ENTRY;
 
-        body = lustre_msg_buf((*reqp)->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
+        body = lustre_msg_buf((*reqp)->rq_repmsg,
+                              DLM_REPLY_REC_OFF, sizeof(*body));
         LASSERT(body != NULL);
 
         if (!(body->valid & OBD_MD_MDS))
@@ -106,12 +107,13 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
                 GOTO(out, rc = -ENOMEM);
         
         op_data->fid1 = nid;
-        rc = lmv_fld_lookup(obd, &nid, &mds);
-        if (rc)
-                RETURN(rc);
-        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
-                            lmm, lmmsize, it, flags, &req,
-                            cb_blocking, extra_lock_flags);
+
+        tgt_exp = lmv_get_export(lmv, &nid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
+                            &req, cb_blocking, extra_lock_flags);
 
         /*
          * llite needs LOOKUP lock to track dentry revocation in order to
@@ -165,7 +167,7 @@ int lmv_intent_open(struct obd_export *exp, const struct lu_fid *pid,
 
 repeat:
         LASSERT(++loop <= 2);
-        rc = lmv_fld_lookup(obd, &rpid, &mds);
+        rc = lmv_fld_lookup(lmv, &rpid, &mds);
         if (rc)
                 GOTO(out_free_op_data, rc);
         obj = lmv_obj_grab(obd, &rpid);
@@ -298,7 +300,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid,
                 /* caller wants to revalidate attrs of obj we have to revalidate
                  * slaves if requested object is split directory */
                 CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n", PFID(cid));
-                rc = lmv_fld_lookup(obd, cid, &mds);
+                rc = lmv_fld_lookup(lmv, cid, &mds);
                 if (rc)
                         GOTO(out_free_op_data, rc);
 #if 0
@@ -308,7 +310,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid,
                          * intent_lock(), but it may change some day */
                         if (!lu_fid_eq(pid, cid)){
                                 rpid = obj->lo_inodes[mds].li_fid;
-                                rc = lmv_fld_lookup(obd, &rpid, &mds);
+                                rc = lmv_fld_lookup(lmv, &rpid, &mds);
                                 if (rc)
                                         GOTO(out_free_op_data, rc);
                         }
@@ -319,7 +321,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid,
         } else {
                 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
                        len, name, PFID(pid));
-                rc = lmv_fld_lookup(obd, pid, &mds);
+                rc = lmv_fld_lookup(lmv, pid, &mds);
                 if (rc)
                         GOTO(out_free_op_data, rc);
                 obj = lmv_obj_grab(obd, pid);
@@ -328,7 +330,7 @@ int lmv_intent_getattr(struct obd_export *exp, const struct lu_fid *pid,
                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                            (char *)name, len);
                         rpid = obj->lo_inodes[mds].li_fid;
-                        rc = lmv_fld_lookup(obd, &rpid, &mds);
+                        rc = lmv_fld_lookup(lmv, &rpid, &mds);
                         if (rc)
                                 GOTO(out_free_op_data, rc);
                         lmv_obj_put(obj);
@@ -475,8 +477,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
         for (i = 0; i < obj->lo_objcount; i++) {
                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
                 struct ptlrpc_request *req = NULL;
+                struct obd_export *tgt_exp;
                 struct lookup_intent it;
-                mdsno_t mds;
 
                 if (lu_fid_eq(&fid, &obj->lo_fid))
                         /* skip master obj */
@@ -492,11 +494,11 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 op_data->fid1 = fid;
                 op_data->fid2 = fid;
 
-                rc = lmv_fld_lookup(obd, &fid, &mds);
-                if (rc)
-                        GOTO(cleanup, rc);
-                rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
-                                    NULL, 0, &it, 0, &req,
+                tgt_exp = lmv_get_export(lmv, &fid);
+                if (IS_ERR(tgt_exp))
+                        GOTO(cleanup, rc = PTR_ERR(tgt_exp));
+
+                rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req,
                                     lmv_blocking_ast, 0);
 
                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
@@ -516,7 +518,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 
                 lock->l_ast_data = lmv_obj_get(obj);
 
-                body2 = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body2));
+                body2 = lustre_msg_buf(req->rq_repmsg,
+                                       DLM_REPLY_REC_OFF, sizeof(*body2));
                 LASSERT(body2);
 
                 obj->lo_inodes[i].li_size = body2->size;
@@ -584,7 +587,7 @@ int lmv_intent_lookup(struct obd_export *exp, const struct lu_fid *pid,
                         rpid = obj->lo_inodes[mds].li_fid;
                         lmv_obj_put(obj);
                 }
-                rc = lmv_fld_lookup(obd, &rpid, &mds);
+                rc = lmv_fld_lookup(lmv, &rpid, &mds);
                 if (rc)
                         GOTO(out_free_op_data, rc);
 
@@ -593,7 +596,7 @@ int lmv_intent_lookup(struct obd_export *exp, const struct lu_fid *pid,
 
                 op_data->fid2 = *cid;
         } else {
-                rc = lmv_fld_lookup(obd, pid, &mds);
+                rc = lmv_fld_lookup(lmv, pid, &mds);
                 if (rc)
                         GOTO(out_free_op_data, rc);
 repeat:
@@ -611,7 +614,7 @@ repeat:
                                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                                    (char *)name, len);
                                 rpid = obj->lo_inodes[mds].li_fid;
-                                rc = lmv_fld_lookup(obd, &rpid, &mds);
+                                rc = lmv_fld_lookup(lmv, &rpid, &mds);
                                 if (rc)
                                         GOTO(out_free_op_data, rc);
                         }
@@ -699,6 +702,7 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                     int extra_lock_flags)
 {
         struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
         const char *name = op_data->name;
         int len = op_data->namelen;
         struct lu_fid *pid, *cid;
@@ -711,9 +715,10 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         pid = fid_is_sane(&op_data->fid1) ? &op_data->fid1 : NULL;
         cid = fid_is_sane(&op_data->fid2) ? &op_data->fid2 : NULL;
 
-        rc = lmv_fld_lookup(obd, pid, &mds);
+        rc = lmv_fld_lookup(lmv, pid, &mds);
         if (rc)
                 RETURN(rc);
+        
         CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID" -> #"LPU64"\n",
                LL_IT2STR(it), len, name, PFID(pid), mds);
 
@@ -747,6 +752,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
         struct ptlrpc_request *mreq = *reqp;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct lustre_handle master_lockh;
+        struct obd_export *tgt_exp;
         struct md_op_data *op_data;
         struct ldlm_lock *lock;
         unsigned long size = 0;
@@ -754,7 +760,6 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
         struct lmv_obj *obj;
         int master_lock_mode;
         int i, rc = 0;
-        mdsno_t mds;
         ENTRY;
 
         OBD_ALLOC_PTR(op_data);
@@ -816,11 +821,13 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 op_data->fid2 = fid;
 
                 /* is obj valid? */
-                rc = lmv_fld_lookup(obd, &fid, &mds);
-                if (rc)
-                        GOTO(out_free_op_data, rc);
-                rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
-                                    NULL, 0, &it, 0, &req, cb, extra_lock_flags);
+                tgt_exp = lmv_get_export(lmv, &fid);
+                if (IS_ERR(tgt_exp))
+                        GOTO(out_free_op_data, rc = PTR_ERR(tgt_exp));
+
+                rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
+                                    extra_lock_flags);
+                
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 if (rc > 0 && req == NULL) {
                         /* nice, this slave is valid */
@@ -857,7 +864,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
 
                 }
 
-                body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
+                body = lustre_msg_buf(req->rq_repmsg,
+                                      DLM_REPLY_REC_OFF, sizeof(*body));
                 LASSERT(body);
 
 update:
@@ -896,7 +904,7 @@ release_lock:
                         body->valid = OBD_MD_FLSIZE;
                         
 #if 0
-                        rc = lmv_fld_lookup(obd, &obj->lo_fid, &body->mds);
+                        rc = lmv_fld_lookup(lmv, &obj->lo_fid, &body->mds);
                         if (rc)
                                 GOTO(cleanup, rc);
 #endif
index 992db96..5e9b2b4 100644 (file)
@@ -23,6 +23,7 @@
 #define _LMV_INTERNAL_H_
 
 #include <lustre/lustre_idl.h>
+#include <obd.h>
 
 #ifndef __KERNEL__
 /* XXX: dirty hack, needs to be fixed more clever way. */
@@ -133,7 +134,7 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
 int lmv_handle_split(struct obd_export *, const struct lu_fid *);
 int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                     void *, int);
-int lmv_fld_lookup(struct obd_device *obd, const struct lu_fid *fid,
+int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid,
                    mdsno_t *mds);
 
 static inline struct lmv_stripe_md * 
@@ -166,6 +167,19 @@ static inline int lmv_get_easize(struct lmv_obd *lmv)
                 sizeof(struct lu_fid);
 }
 
+static inline struct obd_export *
+lmv_get_export(struct lmv_obd *lmv, const struct lu_fid *fid)
+{
+        mdsno_t mds;
+        int rc;
+        
+        rc = lmv_fld_lookup(lmv, fid, &mds);
+        if (rc)
+                return ERR_PTR(rc);
+
+        return lmv->tgts[mds].ltd_exp;
+}
+
 /* lproc_lmv.c */
 extern struct file_operations lmv_proc_target_fops;
 
index b32251d..3d4713b 100644 (file)
@@ -681,7 +681,7 @@ static int lmv_placement_policy(struct obd_device *obd,
                 } else {
                         /* default policy is to use parent MDS */
                         LASSERT(fid_is_sane(hint->ph_pfid));
-                        rc = lmv_fld_lookup(obd, hint->ph_pfid, mds);
+                        rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
                 }
         } else {
                 /* sequences among all tgts are not well balanced, allocate new
@@ -968,16 +968,14 @@ static int lmv_getstatus(struct obd_export *exp,
         RETURN(rc);
 }
 
-static int lmv_getxattr(struct obd_export *exp,
-                        const struct lu_fid *fid,
-                        obd_valid valid, const char *name,
-                        const char *input, int input_size,
-                        int output_size, int flags,
+static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
+                        obd_valid valid, const char *name, const char *input,
+                        int input_size, int output_size, int flags,
                         struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        mdsno_t mds;
+        struct obd_export *tgt_exp;
         int rc;
         ENTRY;
 
@@ -985,26 +983,24 @@ static int lmv_getxattr(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, fid, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
-        rc = md_getxattr(lmv->tgts[mds].ltd_exp, fid, valid, name,
-                         input, input_size, output_size, flags, request);
+        rc = md_getxattr(tgt_exp, fid, valid, name, input, input_size,
+                         output_size, flags, request);
 
         RETURN(rc);
 }
 
-static int lmv_setxattr(struct obd_export *exp,
-                        const struct lu_fid *fid,
-                        obd_valid valid, const char *name,
-                        const char *input, int input_size,
-                        int output_size, int flags,
+static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
+                        obd_valid valid, const char *name, const char *input,
+                        int input_size, int output_size, int flags,
                         struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        mdsno_t mds;
+        struct obd_export *tgt_exp;
         int rc;
         ENTRY;
 
@@ -1012,25 +1008,24 @@ static int lmv_setxattr(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, fid, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
-        rc = md_setxattr(lmv->tgts[mds].ltd_exp, fid, valid, name,
+        rc = md_setxattr(tgt_exp, fid, valid, name,
                          input, input_size, output_size, flags, request);
         
         RETURN(rc);
 }
 
-static int lmv_getattr(struct obd_export *exp,
-                       const struct lu_fid *fid,
+static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
                        obd_valid valid, int ea_size,
                        struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
         struct lmv_obj *obj;
-        mdsno_t mds;
         int rc, i;
         ENTRY;
 
@@ -1038,12 +1033,11 @@ static int lmv_getattr(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, fid, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
-        rc = md_getattr(lmv->tgts[mds].ltd_exp, fid, valid,
-                        ea_size, request);
+        rc = md_getattr(tgt_exp, fid, valid, ea_size, request);
         if (rc)
                 RETURN(rc);
 
@@ -1124,7 +1118,7 @@ static int lmv_close(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        mdsno_t mds;
+        struct obd_export *tgt_exp;
         int rc;
         ENTRY;
 
@@ -1132,12 +1126,12 @@ static int lmv_close(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
         CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->fid1));
-        rc = md_close(lmv->tgts[mds].ltd_exp, op_data, och, request);
+        rc = md_close(tgt_exp, op_data, och, request);
         RETURN(rc);
 }
 
@@ -1148,10 +1142,10 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct ptlrpc_request *req = NULL;
+        struct obd_export *tgt_exp;
         struct lmv_obj *obj;
         struct lustre_md md;
         int mealen, rc;
-        mdsno_t mds;
         __u64 valid;
         ENTRY;
 
@@ -1160,20 +1154,18 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
 
         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
 
-        rc = lmv_fld_lookup(obd, fid, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
         /* time to update mea of parent fid */
-        rc = md_getattr(lmv->tgts[mds].ltd_exp, fid, valid,
-                        mealen, &req);
+        rc = md_getattr(tgt_exp, fid, valid, mealen, &req);
         if (rc) {
                 CERROR("md_getattr() failed, error %d\n", rc);
                 GOTO(cleanup, rc);
         }
 
-        rc = md_get_lustre_md(lmv->tgts[mds].ltd_exp, req, 0,
-                              NULL, &md);
+        rc = md_get_lustre_md(tgt_exp, req, 0, NULL, &md);
         if (rc) {
                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
                 GOTO(cleanup, rc);
@@ -1204,10 +1196,10 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
         struct mdt_body *body;
         struct lmv_obj *obj;
         int rc, loop = 0;
-        mdsno_t mds;
         ENTRY;
 
         rc = lmv_check_connect(obd);
@@ -1220,6 +1212,8 @@ repeat:
         LASSERT(++loop <= 2);
         obj = lmv_obj_grab(obd, &op_data->fid1);
         if (obj) {
+                mdsno_t mds;
+                
                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                    op_data->name, op_data->namelen);
                 op_data->fid1 = obj->lo_inodes[mds].li_fid;
@@ -1229,12 +1223,12 @@ repeat:
         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen,
                op_data->name, PFID(&op_data->fid1));
 
-        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
-        rc = md_create(lmv->tgts[mds].ltd_exp, op_data, data, datalen,
-                       mode, uid, gid, cap_effective, rdev, request);
+        rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
+                       cap_effective, rdev, request);
         if (rc == 0) {
                 if (*request == NULL)
                         RETURN(rc);
@@ -1262,7 +1256,7 @@ static int lmv_done_writing(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        mdsno_t mds;
+        struct obd_export *tgt_exp;
         int rc;
         ENTRY;
 
@@ -1270,10 +1264,11 @@ static int lmv_done_writing(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-        if (rc)
-                RETURN(rc);
-        rc = md_done_writing(lmv->tgts[mds].ltd_exp, op_data);
+        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        rc = md_done_writing(tgt_exp, op_data);
         RETURN(rc);
 }
 
@@ -1288,8 +1283,8 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct lmv_stripe_md *mea = op_data->mea1;
         struct md_op_data *op_data2;
+        struct obd_export *tgt_exp;
         int i, rc = 0;
-        mdsno_t mds;
         ENTRY;
 
         OBD_ALLOC_PTR(op_data2);
@@ -1300,21 +1295,24 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype,
         for (i = 0; i < mea->mea_count; i++) {
                 memset(op_data2, 0, sizeof(*op_data2));
                 op_data2->fid1 = mea->mea_ids[i];
-                rc = lmv_fld_lookup(obd, &op_data2->fid1, &mds);
-                if (rc)
-                        GOTO(cleanup, rc);
 
-                if (lmv->tgts[mds].ltd_exp == NULL)
+                tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
+                if (IS_ERR(tgt_exp))
+                        GOTO(cleanup, rc = PTR_ERR(tgt_exp));
+
+                if (tgt_exp == NULL)
                         continue;
 
-                rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it,
-                                lockmode, op_data2, lockh + i, lmm, lmmsize,
-                                cb_compl, cb_blocking, cb_data, 0);
+                rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
+                                lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
+                                cb_data, 0);
 
                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
+
                 if (rc)
                         GOTO(cleanup, rc);
+                
                 if (it->d.lustre.it_data) {
                         struct ptlrpc_request *req;
                         req = (struct ptlrpc_request *)it->d.lustre.it_data;
@@ -1353,9 +1351,9 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct mdt_body *body = NULL;
         struct lustre_handle plock;
+        struct obd_export *tgt_exp;
         struct md_op_data *rdata;
         int rc = 0, pmode;
-        mdsno_t mds;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
@@ -1384,13 +1382,13 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
         ptlrpc_req_finished(req);
 
-        rc = lmv_fld_lookup(obd, &rdata->fid1, &mds);
-        if (rc)
-                GOTO(out_free_rdata, rc);
-        rc = md_enqueue(lmv->tgts[mds].ltd_exp,
-                        lock_type, it, lock_mode, rdata, lockh, lmm,
-                        lmmsize, cb_compl, cb_blocking, cb_data,
-                        extra_lock_flags);
+        tgt_exp = lmv_get_export(lmv, &rdata->fid1);
+        if (IS_ERR(tgt_exp))
+                GOTO(out_free_rdata, rc = PTR_ERR(tgt_exp));
+
+        rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
+                        lockh, lmm, lmmsize, cb_compl, cb_blocking,
+                        cb_data, extra_lock_flags);
         ldlm_lock_decref(&plock, pmode);
 
         EXIT;
@@ -1409,8 +1407,8 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
         struct lmv_obj *obj;
-        mdsno_t mds;
         int rc;
         ENTRY;
 
@@ -1428,6 +1426,8 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
         if (op_data->namelen) {
                 obj = lmv_obj_grab(obd, &op_data->fid1);
                 if (obj) {
+                        mdsno_t mds;
+                        
                         /* directory is split. look for right mds for this
                          * name */
                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
@@ -1439,13 +1439,14 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
                PFID(&op_data->fid1));
 
-        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-        if (rc)
-                RETURN(rc);
-        rc = md_enqueue(lmv->tgts[mds].ltd_exp,
-                        lock_type, it, lock_mode, op_data, lockh, lmm,
-                        lmmsize, cb_compl, cb_blocking, cb_data,
+        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
+                        lmm, lmmsize, cb_compl, cb_blocking, cb_data,
                         extra_lock_flags);
+        
         if (rc == 0 && it->it_op == IT_OPEN)
                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
                                         op_data, lockh, lmm, lmmsize,
@@ -1461,10 +1462,11 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
         struct lu_fid rid = *fid;
-        int rc, loop = 0;
         struct mdt_body *body;
         struct lmv_obj *obj;
+        int rc, loop = 0;
         mdsno_t mds;
         ENTRY;
 
@@ -1472,7 +1474,7 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
        if (rc)
                RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, fid, &mds);
+        rc = lmv_fld_lookup(lmv, fid, &mds);
         if (rc)
                 RETURN(rc);
 repeat:
@@ -1489,31 +1491,32 @@ repeat:
         CDEBUG(D_OTHER, "getattr_lock for %*s on "DFID" -> "DFID"\n",
                namelen, filename, PFID(fid), PFID(&rid));
 
-        rc = lmv_fld_lookup(obd, &rid, &mds);
-        if (rc)
-                RETURN(rc);
+        tgt_exp = lmv_get_export(lmv, &rid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
-        rc = md_getattr_name(lmv->tgts[mds].ltd_exp,
-                             &rid, filename, namelen,
-                             valid, ea_size, request);
+        rc = md_getattr_name(tgt_exp, &rid, filename, namelen, valid,
+                             ea_size, request);
         if (rc == 0) {
-                body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF, sizeof(*body));
+                body = lustre_msg_buf((*request)->rq_repmsg,
+                                      REQ_REC_OFF, sizeof(*body));
                 LASSERT(body != NULL);
 
                 if (body->valid & OBD_MD_MDS) {
                         struct ptlrpc_request *req = NULL;
 
                         rid = body->fid1;
-                        CDEBUG(D_OTHER, "request attrs for "DFID"\n", PFID(&rid));
+                        CDEBUG(D_OTHER, "request attrs for "DFID"\n",
+                               PFID(&rid));
 
-                        rc = lmv_fld_lookup(obd, &rid, &mds);
-                        if (rc) {
+                        tgt_exp = lmv_get_export(lmv, &rid);
+                        if (IS_ERR(tgt_exp)) {
                                 ptlrpc_req_finished(*request);
-                                RETURN(rc);
+                                RETURN(PTR_ERR(tgt_exp));
                         }
                         
-                        rc = md_getattr_name(lmv->tgts[mds].ltd_exp,
-                                             &rid, NULL, 1, valid, ea_size, &req);
+                        rc = md_getattr_name(tgt_exp, &rid, NULL, 1, valid,
+                                             ea_size, &req);
                         ptlrpc_req_finished(*request);
                         *request = req;
                 }
@@ -1557,7 +1560,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
                         lmv_obj_put(obj);
                 }
 
-                rc = lmv_fld_lookup(obd, &op_data->fid2, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
                 if (rc)
                         RETURN(rc);
 
@@ -1565,7 +1568,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
                        PFID(&op_data->fid2), op_data->namelen,
                        op_data->name, PFID(&op_data->fid1));
         } else {
-                rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
                 if (rc)
                         RETURN(rc);
 
@@ -1610,7 +1613,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                        "to "DFID"\n", newlen, new, oldlen, newlen,
                        PFID(&op_data->fid2), PFID(&op_data->fid1));
 
-                rc = lmv_fld_lookup(obd, &op_data->fid2, &mds);
+                rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
                 if (rc)
                         RETURN(rc);
 
@@ -1659,13 +1662,12 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                 lmv_obj_put(obj);
         }
 
-        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
+        rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
         if (rc)
                 RETURN(rc);
 
-
 request:
-        rc = lmv_fld_lookup(obd, &op_data->fid2, &mds2);
+        rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds2);
         if (rc)
                 RETURN(rc);
 
@@ -1687,10 +1689,10 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct ptlrpc_request *req;
+        struct obd_export *tgt_exp;
         struct mdt_body *body;
         struct lmv_obj *obj;
         int rc = 0, i;
-        mdsno_t mds;
         ENTRY;
 
         rc = lmv_check_connect(obd);
@@ -1706,13 +1708,14 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 for (i = 0; i < obj->lo_objcount; i++) {
                         op_data->fid1 = obj->lo_inodes[i].li_fid;
 
-                        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-                        if (rc)
+                        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+                        if (IS_ERR(tgt_exp)) {
+                                rc = PTR_ERR(tgt_exp);
                                 break;
+                        }
 
-                        rc = md_setattr(lmv->tgts[mds].ltd_exp,
-                                        op_data, iattr, ea, ealen, ea2,
-                                        ea2len, &req);
+                        rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen,
+                                        ea2, ea2len, &req);
 
                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
                                 /*
@@ -1729,12 +1732,12 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 }
                 lmv_obj_put(obj);
         } else {
-                rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-                if (rc)
-                        RETURN(rc);
+                tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+                if (IS_ERR(tgt_exp))
+                        RETURN(PTR_ERR(tgt_exp));
 
-                rc = md_setattr(lmv->tgts[mds].ltd_exp, op_data, iattr, ea,
-                                ealen, ea2, ea2len, request);
+                rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen, ea2,
+                                ea2len, request);
                 if (rc == 0) {
                         body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF,
                                               sizeof(*body));
@@ -1749,7 +1752,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        mdsno_t mds;
+        struct obd_export *tgt_exp;
         int rc;
         ENTRY;
 
@@ -1757,11 +1760,11 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
        if (rc)
                RETURN(rc);
 
-        rc = lmv_fld_lookup(obd, fid, &mds);
-        if (rc)
-                RETURN(rc);
-        rc = md_sync(lmv->tgts[mds].ltd_exp,
-                     fid, request);
+        tgt_exp = lmv_get_export(lmv, fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        rc = md_sync(tgt_exp, fid, request);
         RETURN(rc);
 }
 
@@ -1805,6 +1808,8 @@ int lmv_blocking_ast(struct ldlm_lock *lock,
         RETURN(0);
 }
 
+#if 0
+/* not needed for CMD3 because only dir on master has "." and ".." */
 static void lmv_remove_dots(struct page *page)
 {
         unsigned limit = PAGE_CACHE_SIZE;
@@ -1821,6 +1826,7 @@ static void lmv_remove_dots(struct page *page)
                         p->inode = 0;
         }
 }
+#endif
 
 static int lmv_readpage(struct obd_export *exp,
                         const struct lu_fid *fid,
@@ -1829,9 +1835,9 @@ static int lmv_readpage(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
         struct lu_fid rid = *fid;
         struct lmv_obj *obj;
-        mdsno_t mds;
         int i, rc;
         ENTRY;
 
@@ -1861,24 +1867,25 @@ static int lmv_readpage(struct obd_export *exp,
                        PFID(&rid), (unsigned long)offset);
         }
         
-        rc = lmv_fld_lookup(obd, &rid, &mds);
-        if (rc)
-                RETURN(rc);
-        
-        rc = md_readpage(lmv->tgts[mds].ltd_exp, &rid,
-                         offset, page, request);
+        tgt_exp = lmv_get_export(lmv, &rid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
-        if (0 && rc == 0 && !lu_fid_eq(&rid, fid))
+        rc = md_readpage(tgt_exp, &rid, offset, page, request);
+
+#if 0
+        if (rc == 0 && !lu_fid_eq(&rid, fid))
                 /*
                  * This page isn't from master object. To avoid "." and ".."
                  * duplication in directory, we have to remove them from all
                  * slave objects
                  *
-                 * XXX this is not needed for cmd3 readdir, because only
-                 * master directory has dot and dotdot.
+                 * XXX this is not needed for cmd3 readdir, because only master
+                 * directory has dot and dotdot.
                  */
                 lmv_remove_dots(page);
-
+#endif
+        
         RETURN(rc);
 }
 
@@ -1890,8 +1897,8 @@ static int lmv_unlink_slaves(struct obd_export *exp,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct lmv_stripe_md *mea = op_data->mea1;
         struct md_op_data *op_data2;
+        struct obd_export *tgt_exp;
         int i, rc = 0;
-        mdsno_t mds;
         ENTRY;
 
         OBD_ALLOC_PTR(op_data2);
@@ -1904,14 +1911,14 @@ static int lmv_unlink_slaves(struct obd_export *exp,
                 op_data2->fid1 = mea->mea_ids[i];
                 op_data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
 
-                rc = lmv_fld_lookup(obd, &op_data2->fid1, &mds);
-                if (rc)
-                        GOTO(out_free_op_data2, rc);
-                if (lmv->tgts[mds].ltd_exp == NULL)
+                tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
+                if (IS_ERR(tgt_exp))
+                        GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
+
+                if (tgt_exp == NULL)
                         continue;
 
-                rc = md_unlink(lmv->tgts[mds].ltd_exp,
-                               op_data2, req);
+                rc = md_unlink(tgt_exp, op_data2, req);
 
                 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
                        PFID(&mea->mea_ids[i]), rc);
@@ -1935,7 +1942,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        mdsno_t mds;
+        struct obd_export *tgt_exp;
         int rc, i;
         ENTRY;
 
@@ -1966,10 +1973,11 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
                        PFID(&op_data->fid1));
         }
-        rc = lmv_fld_lookup(obd, &op_data->fid1, &mds);
-        if (rc)
-                RETURN(rc);
-        rc = md_unlink(lmv->tgts[mds].ltd_exp, op_data, request);
+        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+        
+        rc = md_unlink(tgt_exp, op_data, request);
         RETURN(rc);
 }
 
@@ -2216,18 +2224,18 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
 
         if (keylen == strlen("ids") && memcmp(key, "ids", keylen) == 0) {
                 struct lu_fid *fid = (struct lu_fid *)val;
-                mdsno_t mds;
+                struct obd_export *tgt_exp;
 
                 rc = lmv_check_connect(obd);
                 if (rc)
                         RETURN(rc);
 
-                rc = lmv_fld_lookup(obd, fid, &mds);
-                if (rc)
-                        RETURN(rc);
-                rc = obd_set_info_async(lmv->tgts[mds].ltd_exp,
-                                        keylen, key, vallen, val,
-                                        set);
+                tgt_exp = lmv_get_export(lmv, fid);
+                if (IS_ERR(tgt_exp))
+                        RETURN(PTR_ERR(tgt_exp));
+
+                rc = obd_set_info_async(tgt_exp, keylen, key, vallen,
+                                        val, set);
                 RETURN(rc);
         }
 
index 274e7b1..93d8709 100644 (file)
@@ -286,10 +286,10 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct ptlrpc_request *req = NULL;
+        struct obd_export *tgt_exp;
         struct lmv_obj *obj;
         struct lustre_md md;
         int mealen, rc;
-        mdsno_t mds;
         ENTRY;
 
         CDEBUG(D_OTHER, "get mea for "DFID" and create lmv obj\n",
@@ -307,11 +307,11 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
                 md.mea = NULL;
                 valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
 
-                rc = lmv_fld_lookup(obd, fid, &mds);
-                if (rc)
-                        GOTO(cleanup, obj = ERR_PTR(rc));
+                tgt_exp = lmv_get_export(lmv, fid);
+                if (IS_ERR(tgt_exp))
+                        GOTO(cleanup, obj = (void *)tgt_exp);
 
-                rc = md_getattr(lmv->tgts[mds].ltd_exp, fid, valid, mealen, &req);
+                rc = md_getattr(tgt_exp, fid, valid, mealen, &req);
                 if (rc) {
                         CERROR("md_getattr() failed, error %d\n", rc);
                         GOTO(cleanup, obj = ERR_PTR(rc));