Whamcloud - gitweb
- many gcc4 compilation fixes (warnings)
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
index 61b9c0b..ea613ee 100644 (file)
@@ -31,6 +31,7 @@
 #include <linux/pagemap.h>
 #include <asm/div64.h>
 #include <linux/seq_file.h>
+#include <linux/namei.h>
 #else
 #include <liblustre.h>
 #endif
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/obd_lmv.h>
+#include <linux/lustre_lite.h>
 #include "lmv_internal.h"
 
-
 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
 {
-        if (it->d.lustre.it_lock_mode != 0)
-                ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
-                                 it->d.lustre.it_lock_mode);
+        if (LUSTRE_IT(it)->it_lock_mode != 0)
+                ldlm_lock_decref((void *)&LUSTRE_IT(it)->it_lock_handle,
+                                 LUSTRE_IT(it)->it_lock_mode);
 }
 
-int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
-                            int lmmsize, struct lookup_intent *it,
-                            int flags, struct ptlrpc_request **reqp,
-                            ldlm_blocking_callback cb_blocking)
+int lmv_intent_remote(struct obd_export *exp, void *lmm,
+                      int lmmsize, struct lookup_intent *it,
+                      int flags, struct ptlrpc_request **reqp,
+                      ldlm_blocking_callback cb_blocking)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct ptlrpc_request *req = NULL;
         struct mds_body *body = NULL;
-        int rc = 0;
+        struct lustre_handle plock;
+        struct lustre_id nid;
+        int pmode, rc = 0;
         ENTRY;
 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
 
-        if (body->valid & OBD_MD_MDS) {
+        if (!(body->valid & OBD_MD_MDS))
+                RETURN(0);
+
+        /*
+         * oh, MDS reports that this is remote inode case i.e. we have to ask
+         * for real attrs on another MDS.
+         */
+        if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
                 /*
-                 * oh, MDS reports that this is remote inode case i.e. we have
-                 * to ask for real attrs on another MDS.
+                 * unfortunately, we have to lie to MDC/MDS to retrieve
+                 * attributes llite needs.
                  */
-                struct ptlrpc_request *req = NULL;
-                struct lustre_handle plock;
-                struct lustre_id nid;
-                int pmode;
-
-                if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
-                        /*
-                         * unfortunately, we have to lie to MDC/MDS to retrieve
-                         * attributes llite needs.
-                         */
-                        it->it_op = IT_GETATTR;
-                }
+                it->it_op = IT_GETATTR;
+        }
 
-                /* we got LOOKUP lock, but we really need attrs */
-                pmode = it->d.lustre.it_lock_mode;
-                if (pmode) {
-                        memcpy(&plock, &it->d.lustre.it_lock_handle,
-                               sizeof(plock));
-                        it->d.lustre.it_lock_mode = 0;
-                }
+        /* we got LOOKUP lock, but we really need attrs */
+        pmode = LUSTRE_IT(it)->it_lock_mode;
+        if (pmode) {
+                memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle,
+                       sizeof(plock));
+                LUSTRE_IT(it)->it_lock_mode = 0;
+                LUSTRE_IT(it)->it_data = 0;
+        }
 
-                nid = body->id1;
-                it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
-                rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid,
-                                    NULL, 0, lmm, lmmsize, NULL, it, flags,
-                                    &req, cb_blocking);
+        LASSERT((body->valid & OBD_MD_FID) != 0);
+                
+        nid = body->id1;
+        LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
+        rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid,
+                            NULL, 0, lmm, lmmsize, NULL, it, flags,
+                            &req, cb_blocking);
 
-                /*
-                 * llite needs LOOKUP lock to track dentry revocation in order
-                 * to maintain dcache consistency. Thus drop UPDATE lock here
-                 * and put LOOKUP in request.
-                 */
-                if (rc == 0) {
-                        lmv_drop_intent_lock(it);
-                        memcpy(&it->d.lustre.it_lock_handle, &plock,
-                               sizeof(plock));
-                        it->d.lustre.it_lock_mode = pmode;
-                        
-                } else if (pmode)
-                        ldlm_lock_decref(&plock, pmode);
-
-                ptlrpc_req_finished(*reqp);
-                *reqp = req;
+        /*
+         * llite needs LOOKUP lock to track dentry revocation in order to
+         * maintain dcache consistency. Thus drop UPDATE lock here and put
+         * LOOKUP in request.
+         */
+        if (rc == 0) {
+                lmv_drop_intent_lock(it);
+                memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock,
+                       sizeof(plock));
+                LUSTRE_IT(it)->it_lock_mode = pmode;
+        } else if (pmode) {
+                ldlm_lock_decref(&plock, pmode);
         }
+
+        ptlrpc_req_finished(*reqp);
+        *reqp = req;
         RETURN(rc);
 }
 
@@ -157,8 +160,7 @@ repeat:
         }
 
         rc = md_intent_lock(lmv->tgts[id_group(&rpid)].ltd_exp, &rpid, name,
-                            len, lmm, lmmsize, cid, it, flags, reqp,
-                            cb_blocking);
+                            len, lmm, lmmsize, cid, it, flags, reqp, cb_blocking);
         if (rc == -ERESTART) {
                 /* directory got splitted. time to update local object and
                  * repeat the request with proper MDS */
@@ -174,14 +176,13 @@ repeat:
 
         /* okay, MDS has returned success. Probably name has been resolved in
          * remote inode */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
-                                     flags, reqp, cb_blocking);
+        rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking);
         if (rc != 0) {
                 LASSERT(rc < 0);
 
                 /* 
                  * this is possible, that some userspace application will try to
-                 * open file as directory and we will have error -20 here. As
+                 * open file as directory and we will have -ENOTDIR here. As
                  * this is "usual" situation, we should not print error here,
                  * only debug info.
                  */
@@ -191,11 +192,24 @@ repeat:
                 RETURN(rc);
         }
 
+        /*
+         * nothing is found, do not access body->id1 as it is zero and thus
+         * pointless.
+         */
+        if ((LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG) &&
+            !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_CREATE) &&
+            !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_OPEN))
+                RETURN(0);
+
         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
          * to update them for splitted dir */
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
 
+        /* could not find object, FID is not present in response. */
+        if (!(body->valid & OBD_MD_FID))
+                RETURN(0);
+        
         cid = &body->id1;
         obj = lmv_grab_obj(obd, cid);
         if (!obj && (mea = lmv_splitted_dir_body(*reqp, 1))) {
@@ -233,7 +247,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct mds_body *body = NULL;
         struct lustre_id rpid = *pid;
-        struct lmv_obj *obj, *obj2;
+        struct lmv_obj *obj = NULL, *obj2 = NULL;
         struct mea *mea;
         int rc = 0, mds;
         ENTRY;
@@ -243,6 +257,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                  * slaves if requested object is splitted directory */
                 CDEBUG(D_OTHER, "revalidate attrs for "DLID4"\n", OLID4(cid));
                 mds = id_group(cid);
+#if 0
                 obj = lmv_grab_obj(obd, cid);
                 if (obj) {
                         /* in fact, we need not this with current intent_lock(),
@@ -252,7 +267,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                                 mds = id_group(&rpid);
                         }
                         lmv_put_obj(obj);
-               }
+                }
+#endif
         } else {
                 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DLID4"\n",
                        len, name, OLID4(pid));
@@ -270,9 +286,10 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                                mds, OLID4(&rpid));
                 }
         }
-        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name,
-                            len, lmm, lmmsize, cid, it, flags, reqp,
-                            cb_blocking);
+
+        /* the same about fid returning. */
+        rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name, len, lmm,
+                            lmmsize, cid, it, flags, reqp, cb_blocking);
         if (rc < 0)
                 RETURN(rc);
        
@@ -298,14 +315,25 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
  
         /* okay, MDS has returned success. probably name has been
          * resolved in remote inode */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
-                                     flags, reqp, cb_blocking);
+        rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
+                               reqp, cb_blocking);
         if (rc < 0)
                 RETURN(rc);
 
+        /*
+         * nothing is found, do not access body->id1 as it is zero and thus
+         * pointless.
+         */
+        if (LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG)
+                RETURN(0);
+                
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
-        
+
+        /* could not find object, FID is not present in response. */
+        if (!(body->valid & OBD_MD_FID))
+                RETURN(0);
+
         cid = &body->id1;
         obj2 = lmv_grab_obj(obd, cid);
 
@@ -324,7 +352,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                 CDEBUG(D_OTHER, "attrs from slaves for "DLID4", rc %d\n",
                        OLID4(cid), rc);
                 
-                rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1, cb_blocking);
+                rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1,
+                                           cb_blocking);
                 lmv_put_obj(obj2);
         }
         RETURN(rc);
@@ -363,6 +392,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
+        LASSERT((body->valid & OBD_MD_FID) != 0);
 
         obj = lmv_grab_obj(obd, &body->id1);
         LASSERT(obj != NULL);
@@ -386,11 +416,13 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 /* is obj valid? */
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
+                OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
+
                 rc = md_intent_lock(lmv->tgts[id_group(&id)].ltd_exp, &id,
                                     NULL, 0, NULL, 0, &id, &it, 0, &req,
                                     lmv_dirobj_blocking_ast);
                 
-                lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
+                lockh = (struct lustre_handle *)&LUSTRE_IT(&it)->it_lock_handle;
                 if (rc > 0 && req == NULL) {
                         /* nice, this slave is valid */
                         LASSERT(req == NULL);
@@ -398,10 +430,11 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                         goto release_lock;
                 }
 
-                if (rc < 0)
+                if (rc < 0) {
+                        OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
                         /* error during lookup */
                         GOTO(cleanup, rc);
-                
+                } 
                 lock = ldlm_handle2lock(lockh);
                 LASSERT(lock);
 
@@ -422,8 +455,9 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 release_lock:
                 lmv_update_body_from_obj(body, obj->objs + i);
 
-                if (it.d.lustre.it_lock_mode)
-                        ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
+                if (LUSTRE_IT(&it)->it_lock_mode)
+                        ldlm_lock_decref(lockh, LUSTRE_IT(&it)->it_lock_mode);
+                OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
         }
 
         EXIT;
@@ -494,8 +528,8 @@ repeat:
                 }
         }
         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &rpid, name,
-                            len, lmm, lmmsize, cid, it, flags, reqp, 
-                            cb_blocking);
+                            len, lmm, lmmsize, cid, it, flags,
+                            reqp, cb_blocking);
         if (rc > 0) {
                 LASSERT(cid != 0);
                 RETURN(rc);
@@ -535,13 +569,13 @@ repeat:
 
         /* okay, MDS has returned success. Probably name has been resolved in
          * remote inode. */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it, flags,
-                                     reqp, cb_blocking);
+        rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking);
 
         if (rc == 0 && (mea = lmv_splitted_dir_body(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it */
                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
                 LASSERT(body != NULL);
+                LASSERT((body->valid & OBD_MD_FID) != 0);
                 
                 obj = lmv_grab_obj(obd, &body->id1);
                 if (!obj) {
@@ -634,8 +668,10 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
 
                 memset(&it, 0, sizeof(it));
                 it.it_op = IT_GETATTR;
+
                 cb = lmv_dirobj_blocking_ast;
 
+                OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
                 if (id_equal_fid(&id, &obj->id)) {
                         if (master_valid) {
                                 /* lmv_intent_getattr() already checked
@@ -657,11 +693,12 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         cb = cb_blocking;
                 }
 
+               
                 /* is obj valid? */
                 rc = md_intent_lock(lmv->tgts[id_group(&id)].ltd_exp,
                                     &id, NULL, 0, NULL, 0, &id, &it, 0, 
                                     &req, cb);
-                lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
+                lockh = (struct lustre_handle *) &LUSTRE_IT(&it)->it_lock_handle;
                 if (rc > 0 && req == NULL) {
                         /* nice, this slave is valid */
                         LASSERT(req == NULL);
@@ -669,17 +706,18 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         goto release_lock;
                 }
 
-                if (rc < 0)
+                if (rc < 0) {
+                        OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
                         /* error during revalidation */
                         GOTO(cleanup, rc);
-
+                }
                 if (master) {
                         LASSERT(master_valid == 0);
                         /* save lock on master to be returned to the caller */
                         CDEBUG(D_OTHER, "no lock on master yet\n");
                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
-                        master_lock_mode = it.d.lustre.it_lock_mode;
-                        it.d.lustre.it_lock_mode = 0;
+                        master_lock_mode = LUSTRE_IT(&it)->it_lock_mode;
+                        LUSTRE_IT(&it)->it_lock_mode = 0;
                 } else {
                         /* this is slave. we want to control it */
                         lock = ldlm_handle2lock(lockh);
@@ -705,14 +743,15 @@ update:
                 
                 CDEBUG(D_OTHER, "fresh: %lu\n",
                        (unsigned long)obj->objs[i].size);
-
+                
                 if (req)
                         ptlrpc_req_finished(req);
 release_lock:
                 size += obj->objs[i].size;
 
-                if (it.d.lustre.it_lock_mode)
-                        ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
+                if (LUSTRE_IT(&it)->it_lock_mode)
+                        ldlm_lock_decref(lockh, LUSTRE_IT(&it)->it_lock_mode);
+                OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
         }
 
         if (*reqp) {
@@ -736,16 +775,16 @@ release_lock:
 //                        body->mds = id_group(&obj->id);
                 }
                 if (master_valid == 0) {
-                        memcpy(&oit->d.lustre.it_lock_handle,
+                        memcpy(&LUSTRE_IT(oit)->it_lock_handle,
                                &master_lockh, sizeof(master_lockh));
-                        oit->d.lustre.it_lock_mode = master_lock_mode;
+                        LUSTRE_IT(oit)->it_lock_mode = master_lock_mode;
                 }
                 rc = 0;
         } else {
                 /* it seems all the attrs are fresh and we did no request */
                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
                 if (master_valid == 0)
-                        oit->d.lustre.it_lock_mode = master_lock_mode;
+                        LUSTRE_IT(oit)->it_lock_mode = master_lock_mode;
                 rc = 1;
         }