Whamcloud - gitweb
- fixed ma_valid in cmm_mdsnum_check();
authoryury <yury>
Sat, 28 Oct 2006 11:28:05 +0000 (11:28 +0000)
committeryury <yury>
Sat, 28 Oct 2006 11:28:05 +0000 (11:28 +0000)
- fixes in cmm_try_to_split();
- added error messages to cmm_try_to_split();
- cache mdsnum for lmv slaves so that we need not to lookup them all the time. This reduces number of fld cache lookups from ~46000 to ~12000 on sanity 24o.

lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/fld/fld_cache.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lmv/lmv_object.c
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c

index def7915..41f2e94 100644 (file)
@@ -45,7 +45,7 @@ enum {
 };
 
 struct cmm_device {
-        struct md_device       cmm_md_dev;
+        struct md_device      cmm_md_dev;
         /* device flags, taken from enum cmm_flags */
         __u32                 cmm_flags;
         /* underlaying device in MDS stack, usually MDD */
index 793e078..04edafc 100644 (file)
@@ -379,20 +379,18 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env,
         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
         int rc, split;
         ENTRY;
-        
-        memset(ma, 0, sizeof(*ma));
 
         /*
-         * Check only if we need protection from split. If not - mdt
-         * handles other cases.
+         * Check only if we need protection from split. If not - mdt handles
+         * other cases.
          */
-        rc = cmm_expect_splitting(env, mo, ma, &split);
+        rc = cmm_expect_splitting(env, mo, ma, &split); 
         if (rc) {
                 CERROR("Can't check for possible split, error %d\n",
                        rc);
                 RETURN(MDL_MINMODE);
         }
-
+        
         /*
          * Do not take PDO lock on non-splittable objects if this is not PW,
          * this should speed things up a bit.
@@ -401,11 +399,8 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env,
                 RETURN(MDL_NL);
 
         /* Protect splitting by exclusive lock. */
-        if (split == CMM_EXPECT_SPLIT && lm == MDL_PW) {
-                CDEBUG(D_INFO|D_WARNING, "Going to split "DFID"\n",
-                       PFID(lu_object_fid(&mo->mo_lu)));
+        if (split == CMM_EXPECT_SPLIT && lm == MDL_PW)
                 RETURN(MDL_EX);
-        }
 
         /* Have no idea about lock mode, let it be what higher layer wants. */
         RETURN(MDL_MINMODE);
index 5fec095..baf7ba0 100644 (file)
@@ -72,6 +72,13 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
         if (ma->ma_valid & MA_LMV) {
                 int stripe;
 
+                /* 
+                 * Clean MA_LMV in ->ma_valid because mdd will do nothing
+                 * counting that EA is already taken.
+                 */
+                ma->ma_valid &= ~MA_LMV;
+
+                LASSERT(ma->ma_lmv_size > 0);
                 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
                 if (ma->ma_lmv == NULL)
                         RETURN(-ENOMEM);
@@ -125,6 +132,7 @@ int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
         }
 
         /* MA_INODE is needed to check inode size. */
+        memset(ma, 0, sizeof(*ma));
         ma->ma_need = MA_INODE | MA_LMV;
         rc = mo_attr_get(env, mo, ma);
         if (rc)
@@ -508,15 +516,14 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
         ENTRY;
 
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
-        memset(ma, 0, sizeof(*ma));
 
         /* Step1: Checking whether the dir needs to be split. */
         rc = cmm_expect_splitting(env, mo, ma, &split);
         if (rc)
-                GOTO(cleanup, rc);
+                RETURN(rc);
         
         if (split != CMM_EXPECT_SPLIT)
-                GOTO(cleanup, rc = 0);
+                RETURN(0);
 
         LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
                  "dir is protected by MDL_EX lock. Lock mode 0x%x\n",
@@ -527,18 +534,24 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
          * this one ops, confilct with current recovery design.
          */
         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
-        if (rc)
-                GOTO(cleanup, rc = 0);
+        if (rc) {
+                CERROR("Can't disable trans for split, rc %d\n", rc);
+                RETURN(rc);
+        }
 
         /* Step2: Create slave objects (on slave MDTs) */
         rc = cmm_slaves_create(env, mo, ma);
-        if (rc)
-                GOTO(cleanup, ma);
+        if (rc) {
+                CERROR("Can't create slaves for split, rc %d\n", rc);
+                GOTO(cleanup, rc);
+        }
 
         /* Step3: Scan and split the object. */
         rc = cmm_scan_and_split(env, mo, ma);
-        if (rc)
-                GOTO(cleanup, ma);
+        if (rc) {
+                CERROR("Can't scan and split, rc %d\n", rc);
+                GOTO(cleanup, rc);
+        }
 
         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
         
@@ -549,6 +562,9 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
                 CWARN("Dir "DFID" has been split\n",
                       PFID(lu_object_fid(&mo->mo_lu)));
                 rc = -ERESTART;
+        } else {
+                CERROR("Can't set MEA to master dir, "
+                       "rc %d\n", rc);
         }
         EXIT;
 cleanup:
index 86ea531..26a68a7 100644 (file)
@@ -166,8 +166,8 @@ fld_cache_bucket(struct fld_cache *cache, seqno_t seq)
 
 /*
  * Check if cache needs to be shrinked. If so - do it. Tries to keep all
- * collision lists well balanced. That is, checks all of them and removes one
- * entry in list and so on.
+ * collision lists well balanced. That is, check all of them and remove one
+ * entry in list and so on until cache is shrinked enough.
  */
 static int fld_cache_shrink(struct fld_cache *cache)
 {
index 16f768f..f771763 100644 (file)
@@ -101,7 +101,7 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
 
         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
 
-        tgt_exp = lmv_get_export(lmv, &body->fid1);
+        tgt_exp = lmv_find_export(lmv, &body->fid1);
         if (IS_ERR(tgt_exp))
                 GOTO(out, rc = PTR_ERR(tgt_exp));
 
@@ -157,10 +157,8 @@ int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
         mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                (char *)op->name, op->namelen);
         rpid = &obj->lo_inodes[mea_idx].li_fid;
-        rc = lmv_fld_lookup(lmv, rpid, &mds);
+        mds = obj->lo_inodes[mea_idx].li_mds;
         lmv_obj_put(obj);
-        if (rc)
-                RETURN(rc);
 
         rc = __lmv_fid_alloc(lmv, fid, mds);
         if (rc) {
@@ -217,11 +215,12 @@ repeat:
                                        op_data->namelen);
 
                 rpid = obj->lo_inodes[mea_idx].li_fid;
+                tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                 lmv_obj_put(obj);
                 CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid));
+        } else {
+                tgt_exp = lmv_find_export(lmv, &rpid);
         }
-
-        tgt_exp = lmv_get_export(lmv, &rpid);
         if (IS_ERR(tgt_exp))
                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp));
         
@@ -267,11 +266,10 @@ repeat:
                                cb_blocking, extra_lock_flags);
         if (rc != 0) {
                 LASSERT(rc < 0);
-
                 /*
                  * This is possible, that some userspace application will try to
                  * open file as directory and we will have -ENOTDIR here. As
-                 * this is "usual" situation, we should not print error here,
+                 * this is normal situation, we should not print error here,
                  * only debug info.
                  */
                 CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
@@ -373,11 +371,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
                 if (obj) {
                         if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){
                                 rpid = obj->lo_inodes[mds].li_fid;
-                                rc = lmv_fld_lookup(lmv, &rpid, &mds);
-                                if (rc) {
-                                        lmv_obj_put(obj);
-                                        GOTO(out_free_sop_data, rc);
-                                }
+                                mds = obj->lo_inodes[mds].li_mds;
                         }
                         lmv_obj_put(obj);
                 }
@@ -398,15 +392,15 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
                                                (char *)op_data->name,
                                                op_data->namelen);
                         rpid = obj->lo_inodes[mea_idx].li_fid;
-                        rc = lmv_fld_lookup(lmv, &rpid, &mds);
-                        if (rc) {
-                                lmv_obj_put(obj);
-                                GOTO(out_free_sop_data, rc);
-                        }
+                        mds = obj->lo_inodes[mea_idx].li_mds;
                         lmv_obj_put(obj);
 
                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
                                mds, PFID(&rpid));
+                } else {
+                        rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
+                        if (rc)
+                                GOTO(out_free_sop_data, rc);
                 }
         }
 
@@ -563,7 +557,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 op_data->fid1 = fid;
                 op_data->fid2 = fid;
 
-                tgt_exp = lmv_get_export(lmv, &fid);
+                tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
                 if (IS_ERR(tgt_exp))
                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
 
@@ -662,18 +656,17 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
                                                (char *)op_data->name,
                                                op_data->namelen);
                         rpid = obj->lo_inodes[mea_idx].li_fid;
+                        mds = obj->lo_inodes[mea_idx].li_mds;
                         lmv_obj_put(obj);
+                } else {
+                        rc = lmv_fld_lookup(lmv, &rpid, &mds);
+                        if (rc)
+                                GOTO(out_free_sop_data, rc);
                 }
-                rc = lmv_fld_lookup(lmv, &rpid, &mds);
-                if (rc)
-                        GOTO(out_free_sop_data, rc);
 
                 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
                        PFID(&op_data->fid2), mds);
         } else {
-                rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
-                if (rc)
-                        GOTO(out_free_sop_data, rc);
 repeat:
                 LASSERT(++loop <= 2);
 
@@ -691,13 +684,13 @@ repeat:
                                                        (char *)op_data->name,
                                                        op_data->namelen);
                                 rpid = obj->lo_inodes[mea_idx].li_fid;
-                                rc = lmv_fld_lookup(lmv, &rpid, &mds);
-                                if (rc) {
-                                        lmv_obj_put(obj);
-                                        GOTO(out_free_sop_data, rc);
-                                }
+                                mds = obj->lo_inodes[mea_idx].li_mds;
                         }
                         lmv_obj_put(obj);
+                } else {
+                        rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
+                        if (rc)
+                                GOTO(out_free_sop_data, rc);
                 }
                 fid_zero(&op_data->fid2);
         }
@@ -897,7 +890,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 op_data->fid2 = fid;
 
                 /* is obj valid? */
-                tgt_exp = lmv_get_export(lmv, &fid);
+                tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
                 if (IS_ERR(tgt_exp))
                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
 
index 4e5b397..cb60b10 100644 (file)
@@ -44,6 +44,7 @@ struct qstr {
 
 struct lmv_inode {
         struct lu_fid      li_fid;        /* id of dirobj */
+        mdsno_t            li_mds;        /* cached mdsno where @li_fid lives */
         unsigned long      li_size;       /* slave size value */
         int                li_flags;
 };
@@ -174,7 +175,13 @@ static inline int lmv_get_easize(struct lmv_obd *lmv)
 }
 
 static inline struct obd_export *
-lmv_get_export(struct lmv_obd *lmv, const struct lu_fid *fid)
+lmv_get_export(struct lmv_obd *lmv, mdsno_t mds)
+{
+        return lmv->tgts[mds].ltd_exp;
+}
+
+static inline struct obd_export *
+lmv_find_export(struct lmv_obd *lmv, const struct lu_fid *fid)
 {
         mdsno_t mds;
         int rc;
@@ -183,7 +190,7 @@ lmv_get_export(struct lmv_obd *lmv, const struct lu_fid *fid)
         if (rc)
                 return ERR_PTR(rc);
 
-        return lmv->tgts[mds].ltd_exp;
+        return lmv_get_export(lmv, mds);
 }
 
 /* lproc_lmv.c */
index 3c3df17..e88aa93 100644 (file)
@@ -760,22 +760,22 @@ static int lmv_placement_policy(struct obd_device *obd,
                  */
                 obj = lmv_obj_grab(obd, hint->ph_pfid);
                 if (obj) {
+                        struct lu_fid *rpid;
+                        int mea_idx;
+
                         /*
                          * If the dir got split, alloc fid according to its
                          * hash. No matter what we create, object create should
-                         * go to correct MDS. 
+                         * go to correct MDS.
                          */
-                        struct lu_fid *rpid;
-                        int mea_idx;
                         mea_idx = raw_name2idx(obj->lo_hashtype,
                                                obj->lo_objcount,
                                                hint->ph_cname->name,
                                                hint->ph_cname->len);
                         rpid = &obj->lo_inodes[mea_idx].li_fid;
-                        rc = lmv_fld_lookup(lmv, rpid, mds);
+                        *mds = obj->lo_inodes[mea_idx].li_mds;
                         lmv_obj_put(obj);
-                        if (rc)
-                                GOTO(exit, rc);
+                        rc = 0;
                                 
                         CDEBUG(D_INODE, "The obj "DFID" has been split, got "
                                "MDS at "LPU64" by name %s\n", PFID(hint->ph_pfid),
@@ -788,7 +788,7 @@ static int lmv_placement_policy(struct obd_device *obd,
                 } else {
                         /*
                          * Default policy for others is to use parent MDS.
-                         * ONLY directories can be cross-ref during creation
+                         * ONLY directories can be cross-ref during creation.
                          */
                         rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
                 }
@@ -1078,7 +1078,7 @@ static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, fid);
+        tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1103,7 +1103,7 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, fid);
+        tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1128,7 +1128,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, fid);
+        tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1219,7 +1219,7 @@ static int lmv_close(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        tgt_exp = lmv_find_export(lmv, &op_data->fid1);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1249,7 +1249,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
 
         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
 
-        tgt_exp = lmv_get_export(lmv, fid);
+        tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1311,16 +1311,18 @@ repeat:
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        op_data->name, op_data->namelen);
                 op_data->fid1 = obj->lo_inodes[mea_idx].li_fid;
+                tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                 lmv_obj_put(obj);
+        } else {
+                tgt_exp = lmv_find_export(lmv, &op_data->fid1);
         }
 
-        CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen,
-               op_data->name, PFID(&op_data->fid1));
-
-        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
+        CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen,
+               op_data->name, PFID(&op_data->fid1));
+
         rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
                        cap_effective, rdev, request);
         if (rc == 0) {
@@ -1359,7 +1361,7 @@ static int lmv_done_writing(struct obd_export *exp,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        tgt_exp = lmv_find_export(lmv, &op_data->fid1);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1391,7 +1393,7 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype,
                 memset(op_data2, 0, sizeof(*op_data2));
                 op_data2->fid1 = mea->mea_ids[i];
 
-                tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
+                tgt_exp = lmv_find_export(lmv, &op_data2->fid1);
                 if (IS_ERR(tgt_exp))
                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
 
@@ -1473,7 +1475,7 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
         ptlrpc_req_finished(req);
 
-        tgt_exp = lmv_get_export(lmv, &fid_copy);
+        tgt_exp = lmv_find_export(lmv, &fid_copy);
         if (IS_ERR(tgt_exp))
                 GOTO(out, rc = PTR_ERR(tgt_exp));
 
@@ -1505,7 +1507,7 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct obd_export *tgt_exp;
+        struct obd_export *tgt_exp = NULL;
         struct lmv_obj *obj;
         int rc;
         ENTRY;
@@ -1533,16 +1535,19 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
                                                (char *)op_data->name,
                                                op_data->namelen);
                         op_data->fid1 = obj->lo_inodes[mea_idx].li_fid;
+                        tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                         lmv_obj_put(obj);
                 }
         }
-        CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
-               PFID(&op_data->fid1));
-
-        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        
+        if (tgt_exp == NULL)
+                tgt_exp = lmv_find_export(lmv, &op_data->fid1);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
+        CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
+               PFID(&op_data->fid1));
+
         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
                         lmm, lmmsize, cb_compl, cb_blocking, cb_data,
                         extra_lock_flags);
@@ -1582,16 +1587,17 @@ repeat:
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        filename, namelen - 1);
                 rid = obj->lo_inodes[mea_idx].li_fid;
+                tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                 lmv_obj_put(obj);
+        } else {
+                tgt_exp = lmv_find_export(lmv, &rid);
         }
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
         CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
                namelen, filename, PFID(fid), PFID(&rid));
 
-        tgt_exp = lmv_get_export(lmv, &rid);
-        if (IS_ERR(tgt_exp))
-                RETURN(PTR_ERR(tgt_exp));
-
         rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
                              ea_size, request);
         if (rc == 0) {
@@ -1606,7 +1612,7 @@ repeat:
                         CDEBUG(D_OTHER, "request attrs for "DFID"\n",
                                PFID(&rid));
 
-                        tgt_exp = lmv_get_export(lmv, &rid);
+                        tgt_exp = lmv_find_export(lmv, &rid);
                         if (IS_ERR(tgt_exp)) {
                                 ptlrpc_req_finished(*request);
                                 RETURN(PTR_ERR(tgt_exp));
@@ -1657,13 +1663,14 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
                                                op_data->name,
                                                op_data->namelen);
                         op_data->fid2 = obj->lo_inodes[mea_idx].li_fid;
+                        mds = obj->lo_inodes[mea_idx].li_mds;
                         lmv_obj_put(obj);
+                } else {
+                        rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
+                        if (rc)
+                                RETURN(rc);
                 }
 
-                rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
-                if (rc)
-                        RETURN(rc);
-
                 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
                        PFID(&op_data->fid2), op_data->namelen,
                        op_data->name, PFID(&op_data->fid1));
@@ -1695,8 +1702,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct lmv_obj *obj;
-        mdsno_t mds, mds2;
         int rc, mea_idx;
+        mdsno_t mds;
         ENTRY;
 
         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
@@ -1747,8 +1754,13 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        (char *)old, oldlen);
                 op_data->fid1 = obj->lo_inodes[mea_idx].li_fid;
+                mds = obj->lo_inodes[mea_idx].li_mds;
                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->fid1));
                 lmv_obj_put(obj);
+        } else {
+                rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
+                if (rc)
+                        RETURN(rc);
         }
 
         obj = lmv_obj_grab(obd, &op_data->fid2);
@@ -1764,21 +1776,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->fid2));
                 lmv_obj_put(obj);
         }
-
-        rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
-        if (rc)
-                RETURN(rc);
-
 request:
-        rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds2);
-        if (rc)
-                RETURN(rc);
-
-        if (mds != mds2) {
-                CDEBUG(D_OTHER,"cross-node rename "DFID"/%*s to "DFID"/%*s\n",
-                       PFID(&op_data->fid1), oldlen, old,
-                       PFID(&op_data->fid2), newlen, new);
-        }
         op_data->fsuid = current->fsuid;
         op_data->fsgid = current->fsgid;
         op_data->cap   = current->cap_effective;
@@ -1813,7 +1811,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 for (i = 0; i < obj->lo_objcount; i++) {
                         op_data->fid1 = obj->lo_inodes[i].li_fid;
 
-                        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+                        tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
                         if (IS_ERR(tgt_exp)) {
                                 rc = PTR_ERR(tgt_exp);
                                 break;
@@ -1837,7 +1835,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 }
                 lmv_obj_put(obj);
         } else {
-                tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+                tgt_exp = lmv_find_export(lmv, &op_data->fid1);
                 if (IS_ERR(tgt_exp))
                         RETURN(PTR_ERR(tgt_exp));
 
@@ -1860,7 +1858,7 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
        if (rc)
                RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, fid);
+        tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -1937,14 +1935,16 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 do_div(index, (__u32)seg);
                 i = (int)index;
                 rid = obj->lo_inodes[i].li_fid;
+                tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
 
                 lmv_obj_unlock(obj);
 
                 CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
                        PFID(&rid), (unsigned long)offset, i);
+        } else {
+                tgt_exp = lmv_find_export(lmv, &rid);
         }
 
-        tgt_exp = lmv_get_export(lmv, &rid);
         if (IS_ERR(tgt_exp))
                 GOTO(cleanup, rc = PTR_ERR(tgt_exp));
 
@@ -1970,10 +1970,11 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
          * Here we could remove "." and ".." from all pages which at not from
          * master. But MDS has only "." and ".." for master dir.
          */
+        EXIT;
 cleanup:
         if (obj)
                 lmv_obj_put(obj);
-        RETURN(rc);
+        return rc;
 }
 
 static int lmv_unlink_slaves(struct obd_export *exp,
@@ -1999,7 +2000,7 @@ static int lmv_unlink_slaves(struct obd_export *exp,
                 op_data2->mode = MDS_MODE_DONT_LOCK | S_IFDIR;
                 op_data2->fsuid = current->fsuid;
                 op_data2->fsgid = current->fsgid;
-                tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
+                tgt_exp = lmv_find_export(lmv, &op_data2->fid1);
                 if (IS_ERR(tgt_exp))
                         GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
 
@@ -2030,7 +2031,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct obd_export *tgt_exp;
+        struct obd_export *tgt_exp = NULL;
         int rc;
         ENTRY;
 
@@ -2055,6 +2056,8 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
                                                op_data->name,
                                                op_data->namelen);
                         op_data->fid1 = obj->lo_inodes[mea_idx].li_fid;
+                        tgt_exp = lmv_get_export(lmv, 
+                                        obj->lo_inodes[mea_idx].li_mds);
                         lmv_obj_put(obj);
                         CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
                                op_data->namelen, op_data->name,
@@ -2064,7 +2067,8 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
                        PFID(&op_data->fid1));
         }
-        tgt_exp = lmv_get_export(lmv, &op_data->fid1);
+        if (tgt_exp == NULL)
+                tgt_exp = lmv_find_export(lmv, &op_data->fid1);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
         op_data->fsuid = current->fsuid;
@@ -2433,7 +2437,7 @@ int lmv_set_open_replay_data(struct obd_export *exp,
 
         ENTRY;
 
-        tgt_exp = lmv_get_export(lmv, &och->och_fid);
+        tgt_exp = lmv_find_export(lmv, &och->och_fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -2448,7 +2452,7 @@ int lmv_clear_open_replay_data(struct obd_export *exp,
         struct obd_export *tgt_exp;
         ENTRY;
 
-        tgt_exp = lmv_get_export(lmv, &och->och_fid);
+        tgt_exp = lmv_find_export(lmv, &och->och_fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -2470,7 +2474,7 @@ static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, fid);
+        tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
@@ -2492,7 +2496,7 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, &oc->c_capa.lc_fid);
+        tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
index 4fb9692..03a0f04 100644 (file)
@@ -94,10 +94,21 @@ lmv_obj_alloc(struct obd_device *obd,
 
         /* put all ids in */
         for (i = 0; i < mea->mea_count; i++) {
+                int rc;
+                
                 CDEBUG(D_OTHER, "subobj "DFID"\n",
                        PFID(&mea->mea_ids[i]));
                 obj->lo_inodes[i].li_fid = mea->mea_ids[i];
                 LASSERT(fid_is_sane(&obj->lo_inodes[i].li_fid));
+
+                /* 
+                 * Cache slave mds number to use it in all cases it is needed
+                 * instead of constant lookup.
+                 */
+                rc = lmv_fld_lookup(lmv, &obj->lo_inodes[i].li_fid,
+                                    &obj->lo_inodes[i].li_mds);
+                if (rc)
+                        goto err_obj;
         }
 
         return obj;
@@ -308,7 +319,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
                 md.mea = NULL;
                 valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
 
-                tgt_exp = lmv_get_export(lmv, fid);
+                tgt_exp = lmv_find_export(lmv, fid);
                 if (IS_ERR(tgt_exp))
                         GOTO(cleanup, obj = (void *)tgt_exp);
 
index 0ab9c6d..e235d74 100644 (file)
@@ -203,7 +203,7 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
         } else if (rc < 0) {
                 CERROR("Error %d reading eadata \n", rc);
         } else {
-                /* FIXME convert lov EA but fixed after verification test */
+                /* XXX: convert lov EA but fixed after verification test. */
                 *md_size = rc;
         }
 
index 3dd8245..974cb09 100644 (file)
@@ -316,6 +316,7 @@ static int __mdd_lmv_get(const struct lu_env *env,
 
         if (ma->ma_valid & MA_LMV)
                 RETURN(0);
+        
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
                         MDS_LMV_MD_NAME);
         if (rc > 0) {
@@ -351,7 +352,7 @@ static int mdd_attr_get_internal(const struct lu_env *env,
         }
 #endif
         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
-                        rc, ma->ma_valid);
+               rc, ma->ma_valid);
         RETURN(rc);
 }