Whamcloud - gitweb
Fixes and cleanups in lmv.
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
index 81e4baa..449c01b 100644 (file)
@@ -30,6 +30,7 @@
 #include <linux/slab.h>
 #include <linux/pagemap.h>
 #include <asm/div64.h>
+#include <linux/seq_file.h>
 #else
 #include <liblustre.h>
 #endif
@@ -42,7 +43,6 @@
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
 #include <linux/obd_ost.h>
-#include <linux/seq_file.h>
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/obd_lmv.h>
@@ -88,7 +88,7 @@ int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
 
                 nfid = body->fid1;
                 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
-                rc = md_intent_lock(lmv->tgts[nfid.mds].exp, uctxt, &nfid,
+                rc = md_intent_lock(lmv->tgts[nfid.mds].ltd_exp, uctxt, &nfid,
                                     NULL, 0, lmm, lmmsize, NULL, it, flags,
                                     &req, cb_blocking);
 
@@ -131,20 +131,31 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
         /* IT_OPEN is intended to open (and create, possible) an object.
          * parent (pfid) may be splitted dir */
 
-        mds = pfid->mds;
-        obj = lmv_grab_obj(obd, pfid, 0);
+repeat:
+        mds = rpfid.mds;
+        obj = lmv_grab_obj(obd, &rpfid, 0);
         if (obj) {
                 /* directory is already splitted, so we have to forward
                  * request to the right MDS */
-                mds = raw_name2idx(obj->objcount, name, len);
+                mds = raw_name2idx(obj->objcount, (char *)name, len);
                 rpfid = obj->objs[mds].fid;
                 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
         }
 
-        rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, len,
-                            lmm, lmmsize, cfid, it, flags, reqp, cb_blocking);
-       
+        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
+                            len, lmm, lmmsize, cfid, it, flags, reqp,
+                            cb_blocking);
         lmv_put_obj(obj);
+        if (rc == -ERESTART) {
+                /* directory got splitted. time to update local object
+                 * and repeat the request with proper MDS */
+                LASSERT(fid_equal(pfid, &rpfid));
+                rc = lmv_get_mea_and_update_object(exp, &rpfid);
+                if (rc == 0) {
+                        ptlrpc_req_finished(*reqp);
+                        goto repeat;
+                }
+        }
         if (rc != 0)
                 RETURN(rc);
 
@@ -217,7 +228,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
                          * _intent_lock(), but it may change some day */
                         rpfid = obj->objs[mds].fid;
                 }
-                rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
+                rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
                                     len, lmm, lmmsize, cfid, it, flags, reqp,
                                     cb_blocking);
                 if (obj && rc >= 0) {
@@ -254,7 +265,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
                        (unsigned long) rpfid.id,
                        (unsigned long) rpfid.generation);
         }
-        rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
+        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
                             len, lmm, lmmsize, NULL, it, flags, reqp,
                             cb_blocking);
         if (rc < 0)
@@ -360,7 +371,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 /* is obj valid? */
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
-                rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
+                rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, &fid, &it, 0, &req,
                                     lmv_dirobj_blocking_ast);
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
@@ -382,7 +393,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 req = NULL;
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
-                rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
+                rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, NULL, &it, 0, &req,
                                     lmv_dirobj_blocking_ast);
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
@@ -455,7 +466,7 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
                        (unsigned long) cfid->mds,
                        (unsigned long) cfid->id,
                        (unsigned long) cfid->generation, mds);
-                rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
+                rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name,
                                     len, lmm, lmmsize, cfid, it, flags,
                                     reqp, cb_blocking);
                 RETURN(rc);
@@ -475,7 +486,7 @@ repeat:
                 lmv_put_obj(obj);
         }
 
-        rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
+        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
                             len, lmm, lmmsize, NULL, it, flags, reqp,
                             cb_blocking);
         if (rc > 0) {
@@ -498,7 +509,7 @@ repeat:
                 RETURN(rc);
         }
        
-        if (rc == -ESTALE) {
+        if (rc == -ERESTART) {
                 /* directory got splitted since last update. this shouldn't
                  * be becasue splitting causes lock revocation, so revalidate
                  * had to fail and lookup on dir had to return mea */
@@ -549,7 +560,10 @@ int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
                         LL_IT2STR(it), len, name, (unsigned long) pfid->id,
                         (unsigned long) pfid->generation, pfid->mds);
 
-        lmv_connect(obd);
+        rc = lmv_check_connect(obd);
+        if (rc)
+                RETURN(rc);
+
         if (it->it_op == IT_LOOKUP)
                 rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm,
                                        lmmsize, cfid, it, flags, reqp,
@@ -558,7 +572,7 @@ int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
                 rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm,
                                      lmmsize, cfid, it, flags, reqp,
                                      cb_blocking);
-        else if (it->it_op == IT_GETATTR)
+        else if (it->it_op == IT_GETATTR || it->it_op == IT_CHDIR)
                 rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm,
                                         lmmsize, cfid, it, flags, reqp,
                                         cb_blocking);
@@ -634,7 +648,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 }
 
                 /* is obj valid? */
-                rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
+                rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 if (rc > 0) {
@@ -655,7 +669,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                 req = NULL;
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
-                rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
+                rc = md_intent_lock(lmv->tgts[fid.mds].ltd_exp, &uctxt, &fid,
                                     NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
                 LASSERT(rc <= 0);