Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
index c1cb22a..3ff3fbb 100644 (file)
@@ -31,6 +31,7 @@
 #include <linux/pagemap.h>
 #include <asm/div64.h>
 #include <linux/seq_file.h>
+#include <linux/namei.h>
 #else
 #include <liblustre.h>
 #endif
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/obd_lmv.h>
+#include <linux/lustre_lite.h>
 #include "lmv_internal.h"
 
-
 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
 {
-        if (it->d.lustre.it_lock_mode != 0)
-                ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
-                                 it->d.lustre.it_lock_mode);
+        if (LUSTRE_IT(it)->it_lock_mode != 0)
+                ldlm_lock_decref((void *)&LUSTRE_IT(it)->it_lock_handle,
+                                 LUSTRE_IT(it)->it_lock_mode);
 }
 
 int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
@@ -80,7 +81,7 @@ int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
                 struct lustre_id nid;
                 int pmode;
 
-                if (it->it_op == IT_LOOKUP) {
+                if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
                         /*
                          * unfortunately, we have to lie to MDC/MDS to retrieve
                          * attributes llite needs.
@@ -89,18 +90,19 @@ int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
                 }
 
                 /* we got LOOKUP lock, but we really need attrs */
-                pmode = it->d.lustre.it_lock_mode;
+                pmode = LUSTRE_IT(it)->it_lock_mode;
                 if (pmode) {
-                        memcpy(&plock, &it->d.lustre.it_lock_handle,
+                        memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle,
                                sizeof(plock));
-                        it->d.lustre.it_lock_mode = 0;
+                        LUSTRE_IT(it)->it_lock_mode = 0;
                 }
 
+                LASSERT((body->valid & OBD_MD_FID) != 0);
+                
                 nid = body->id1;
-                it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
-                rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid,
-                                    NULL, 0, lmm, lmmsize, NULL, it, flags,
-                                    &req, cb_blocking);
+                LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
+                rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid, NULL,
+                                    0, lmm, lmmsize, NULL, it, flags, &req, cb_blocking);
 
                 /*
                  * llite needs LOOKUP lock to track dentry revocation in order
@@ -109,10 +111,9 @@ int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
                  */
                 if (rc == 0) {
                         lmv_drop_intent_lock(it);
-                        memcpy(&it->d.lustre.it_lock_handle, &plock,
+                        memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock,
                                sizeof(plock));
-                        it->d.lustre.it_lock_mode = pmode;
-                        
+                        LUSTRE_IT(it)->it_lock_mode = pmode;
                 } else if (pmode)
                         ldlm_lock_decref(&plock, pmode);
 
@@ -132,9 +133,9 @@ int lmv_intent_open(struct obd_export *exp, struct lustre_id *pid,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct mds_body *body = NULL;
         struct lustre_id rpid = *pid;
+        int rc, mds, loop = 0;
         struct lmv_obj *obj;
         struct mea *mea;
-        int rc, mds, loop = 0;
         ENTRY;
 
         /* IT_OPEN is intended to open (and create, possible) an object. Parent
@@ -157,12 +158,11 @@ repeat:
         }
 
         rc = md_intent_lock(lmv->tgts[id_group(&rpid)].ltd_exp, &rpid, name,
-                            len, lmm, lmmsize, cid, it, flags, reqp,
-                            cb_blocking);
+                            len, lmm, lmmsize, cid, it, flags, reqp, cb_blocking);
         if (rc == -ERESTART) {
                 /* directory got splitted. time to update local object and
                  * repeat the request with proper MDS */
-                LASSERT(lmv_id_equal(pid, &rpid));
+                LASSERT(id_equal_fid(pid, &rpid));
                 rc = lmv_get_mea_and_update_object(exp, &rpid);
                 if (rc == 0) {
                         ptlrpc_req_finished(*reqp);
@@ -178,17 +178,32 @@ repeat:
                                      flags, reqp, cb_blocking);
         if (rc != 0) {
                 LASSERT(rc < 0);
-                CERROR("can't handle remote %s: dir "DLID4"("DLID4"):"
+
+                /* 
+                 * this is possible, that some userspace application will try to
+                 * open file as directory and we will have error -20 here. As
+                 * this is "usual" situation, we should not print error here,
+                 * only debug info.
+                 */
+                CDEBUG(D_OTHER, "can't handle remote %s: dir "DLID4"("DLID4"):"
                        "%*s: %d\n", LL_IT2STR(it), OLID4(pid), OLID4(&rpid),
                        len, name, rc);
                 RETURN(rc);
         }
 
+        /*
+         * nothing is found, do not access body->id1 as it is zero and thus
+         * pointless.
+         */
+        if (LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG)
+                RETURN(0);
+
         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
          * to update them for splitted dir */
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
-
+        LASSERT((body->valid & OBD_MD_FID) != 0);
+        
         cid = &body->id1;
         obj = lmv_grab_obj(obd, cid);
         if (!obj && (mea = lmv_splitted_dir_body(*reqp, 1))) {
@@ -226,7 +241,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct mds_body *body = NULL;
         struct lustre_id rpid = *pid;
-        struct lmv_obj *obj, *obj2;
+        struct lmv_obj *obj = NULL, *obj2 = NULL;
         struct mea *mea;
         int rc = 0, mds;
         ENTRY;
@@ -236,16 +251,18 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                  * slaves if requested object is splitted directory */
                 CDEBUG(D_OTHER, "revalidate attrs for "DLID4"\n", OLID4(cid));
                 mds = id_group(cid);
+#if 0
                 obj = lmv_grab_obj(obd, cid);
                 if (obj) {
                         /* in fact, we need not this with current intent_lock(),
                          * but it may change some day */
-                        if (!lmv_id_equal(pid, cid)){
+                        if (!id_equal_fid(pid, cid)){
                                 rpid = obj->objs[mds].id;
                                 mds = id_group(&rpid);
                         }
                         lmv_put_obj(obj);
-               }
+                }
+#endif
         } else {
                 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DLID4"\n",
                        len, name, OLID4(pid));
@@ -254,7 +271,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                 if (obj && len) {
                         /* directory is already splitted. calculate mds */
                         mds = raw_name2idx(obj->hashtype, obj->objcount, 
-                                           (char *) name, len);
+                                           (char *)name, len);
                         rpid = obj->objs[mds].id;
                         mds = id_group(&rpid);
                         lmv_put_obj(obj);
@@ -262,10 +279,11 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                         CDEBUG(D_OTHER, "forward to MDS #%u (slave "DLID4")\n",
                                mds, OLID4(&rpid));
                 }
-        } 
-        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name,
-                            len, lmm, lmmsize, cid, it, flags, reqp,
-                            cb_blocking);
+        }
+
+        /* the same about fid returning. */
+        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name, len, lmm,
+                            lmmsize, cid, it, flags, reqp, cb_blocking);
         if (rc < 0)
                 RETURN(rc);
        
@@ -296,9 +314,17 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
         if (rc < 0)
                 RETURN(rc);
 
+        /*
+         * nothing is found, do not access body->id1 as it is zero and thus
+         * pointless.
+         */
+        if (LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG)
+                RETURN(0);
+                
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
-        
+        LASSERT((body->valid & OBD_MD_FID) != 0);
+
         cid = &body->id1;
         obj2 = lmv_grab_obj(obd, cid);
 
@@ -317,7 +343,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                 CDEBUG(D_OTHER, "attrs from slaves for "DLID4", rc %d\n",
                        OLID4(cid), rc);
                 
-                rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1, cb_blocking);
+                rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1,
+                                           cb_blocking);
                 lmv_put_obj(obj2);
         }
         RETURN(rc);
@@ -356,6 +383,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
+        LASSERT((body->valid & OBD_MD_FID) != 0);
 
         obj = lmv_grab_obj(obd, &body->id1);
         LASSERT(obj != NULL);
@@ -370,7 +398,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 struct ptlrpc_request *req = NULL;
                 struct lookup_intent it;
 
-                if (lmv_id_equal(&id, &obj->id))
+                if (id_equal_fid(&id, &obj->id))
                         /* skip master obj */
                         continue;
 
@@ -379,11 +407,13 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 /* is obj valid? */
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
+                OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
+
                 rc = md_intent_lock(lmv->tgts[id_group(&id)].ltd_exp, &id,
                                     NULL, 0, NULL, 0, &id, &it, 0, &req,
                                     lmv_dirobj_blocking_ast);
                 
-                lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
+                lockh = (struct lustre_handle *)&LUSTRE_IT(&it)->it_lock_handle;
                 if (rc > 0 && req == NULL) {
                         /* nice, this slave is valid */
                         LASSERT(req == NULL);
@@ -391,10 +421,11 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                         goto release_lock;
                 }
 
-                if (rc < 0)
+                if (rc < 0) {
+                        OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
                         /* error during lookup */
                         GOTO(cleanup, rc);
-                
+                } 
                 lock = ldlm_handle2lock(lockh);
                 LASSERT(lock);
 
@@ -415,13 +446,16 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 release_lock:
                 lmv_update_body_from_obj(body, obj->objs + i);
 
-                if (it.d.lustre.it_lock_mode)
-                        ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
+                if (LUSTRE_IT(&it)->it_lock_mode)
+                        ldlm_lock_decref(lockh, LUSTRE_IT(&it)->it_lock_mode);
+                OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
         }
+
+        EXIT;
 cleanup:
         lmv_unlock_obj(obj);
         lmv_put_obj(obj);
-        RETURN(rc);
+        return rc;
 }
 
 int lmv_intent_lookup(struct obd_export *exp, struct lustre_id *pid,
@@ -444,7 +478,6 @@ int lmv_intent_lookup(struct obd_export *exp, struct lustre_id *pid,
          * this lookup below) or to confirm requested resolving is still valid
          * (let's call this revalidation) cid != NULL specifies revalidation.
          */
-
         if (cid) {
                 /*
                  * this is revalidation: we have to check is LOOKUP lock still
@@ -468,8 +501,9 @@ int lmv_intent_lookup(struct obd_export *exp, struct lustre_id *pid,
                 mds = id_group(pid);
 repeat:
                 LASSERT(++loop <= 2);
-                /* this is lookup. during lookup we have to update all the 
-                 * attributes, because returned values will be put in struct 
+                
+                /* this is lookup. during lookup we have to update all the
+                 * attributes, because returned values will be put in struct
                  * inode */
 
                 obj = lmv_grab_obj(obd, pid);
@@ -484,9 +518,9 @@ repeat:
                         lmv_put_obj(obj);
                 }
         }
-        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, pid, name,
-                            len, lmm, lmmsize, cid, it, flags, reqp, 
-                            cb_blocking);
+        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name,
+                            len, lmm, lmmsize, cid, it, flags,
+                            reqp, cb_blocking);
         if (rc > 0) {
                 LASSERT(cid != 0);
                 RETURN(rc);
@@ -524,15 +558,16 @@ repeat:
         if (rc < 0)
                 RETURN(rc);
 
-        /* okay, MDS has returned success. probably name has been resolved in
-         * remote inode */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it, flags,
-                                     reqp, cb_blocking);
+        /* okay, MDS has returned success. Probably name has been resolved in
+         * remote inode. */
+        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
+                                     flags, reqp, cb_blocking);
 
         if (rc == 0 && (mea = lmv_splitted_dir_body(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it */
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body != NULL);
+                LASSERT((body->valid & OBD_MD_FID) != 0);
                 
                 obj = lmv_grab_obj(obd, &body->id1);
                 if (!obj) {
@@ -625,9 +660,11 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
 
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
+
                 cb = lmv_dirobj_blocking_ast;
 
-                if (lmv_id_equal(&id, &obj->id)) {
+                OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
+                if (id_equal_fid(&id, &obj->id)) {
                         if (master_valid) {
                                 /* lmv_intent_getattr() already checked
                                  * validness and took the lock */
@@ -648,11 +685,12 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         cb = cb_blocking;
                 }
 
+               
                 /* is obj valid? */
                 rc = md_intent_lock(lmv->tgts[id_group(&id)].ltd_exp,
                                     &id, NULL, 0, NULL, 0, &id, &it, 0, 
                                     &req, cb);
-                lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
+                lockh = (struct lustre_handle *) &LUSTRE_IT(&it)->it_lock_handle;
                 if (rc > 0 && req == NULL) {
                         /* nice, this slave is valid */
                         LASSERT(req == NULL);
@@ -660,17 +698,18 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         goto release_lock;
                 }
 
-                if (rc < 0)
+                if (rc < 0) {
+                        OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
                         /* error during revalidation */
                         GOTO(cleanup, rc);
-
+                }
                 if (master) {
                         LASSERT(master_valid == 0);
                         /* save lock on master to be returned to the caller */
                         CDEBUG(D_OTHER, "no lock on master yet\n");
                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
-                        master_lock_mode = it.d.lustre.it_lock_mode;
-                        it.d.lustre.it_lock_mode = 0;
+                        master_lock_mode = LUSTRE_IT(&it)->it_lock_mode;
+                        LUSTRE_IT(&it)->it_lock_mode = 0;
                 } else {
                         /* this is slave. we want to control it */
                         lock = ldlm_handle2lock(lockh);
@@ -696,14 +735,15 @@ update:
                 
                 CDEBUG(D_OTHER, "fresh: %lu\n",
                        (unsigned long)obj->objs[i].size);
-
+                
                 if (req)
                         ptlrpc_req_finished(req);
 release_lock:
                 size += obj->objs[i].size;
 
-                if (it.d.lustre.it_lock_mode)
-                        ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
+                if (LUSTRE_IT(&it)->it_lock_mode)
+                        ldlm_lock_decref(lockh, LUSTRE_IT(&it)->it_lock_mode);
+                OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
         }
 
         if (*reqp) {
@@ -719,27 +759,30 @@ release_lock:
                 body->size = size;
                 
                 if (mreq == NULL) {
-                        /* very important to maintain lli->mds the same because
-                         * of revalidation. mreq == NULL means that caller has
-                         * no reply and the only attr we can return is size */
+                        /* very important to maintain id_group(lli->lli_id) the
+                         * same because of revalidation. mreq == NULL means that
+                         * caller has no reply and the only attr we can return
+                         * is size */
                         body->valid = OBD_MD_FLSIZE;
 //                        body->mds = id_group(&obj->id);
                 }
                 if (master_valid == 0) {
-                        memcpy(&oit->d.lustre.it_lock_handle,
+                        memcpy(&LUSTRE_IT(oit)->it_lock_handle,
                                &master_lockh, sizeof(master_lockh));
-                        oit->d.lustre.it_lock_mode = master_lock_mode;
+                        LUSTRE_IT(oit)->it_lock_mode = master_lock_mode;
                 }
                 rc = 0;
         } else {
                 /* it seems all the attrs are fresh and we did no request */
                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
                 if (master_valid == 0)
-                        oit->d.lustre.it_lock_mode = master_lock_mode;
+                        LUSTRE_IT(oit)->it_lock_mode = master_lock_mode;
                 rc = 1;
         }
+
+        EXIT;
 cleanup:
         lmv_unlock_obj(obj);
         lmv_put_obj(obj);
-        RETURN(rc);
+        return rc;
 }