Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / mds / handler.c
index ffaac22..d827236 100644 (file)
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_commit_confd.h>
 
-#ifdef CONFIG_SNAPFS
-#include <linux/lustre_smfs.h>
-#include <linux/lustre_snap.h>
-#endif
 #include "mds_internal.h"
 
 static int mds_intent_policy(struct ldlm_namespace *ns,
@@ -122,7 +118,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
-                       OBD_FAIL_MDS_SENDPAGE, rc);
+                       OBD_FAIL_MDS_SENDPAGE, rc = -EIO);
                 GOTO(abort_bulk, rc);
         }
 
@@ -323,6 +319,13 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
         if (!inode)
                 RETURN(ERR_PTR(-ENOENT));
 
+        if (is_bad_inode(inode)) {
+                CERROR("bad inode returned %lu/%u\n",
+                       inode->i_ino, inode->i_generation);
+                dput(result);
+                RETURN(ERR_PTR(-ENOENT));
+        }
+
         /* here we disabled generation check, as root inode i_generation
          * of cache mds and real mds are different. */
         if (inode->i_ino != mds->mds_rootfid.id && generation &&
@@ -461,9 +464,11 @@ static int mds_destroy_export(struct obd_export *export)
                        dentry->d_name.len, dentry->d_name.name,
                        ll_bdevname(dentry->d_inode->i_sb, btmp),
                        dentry->d_inode->i_ino);
+                /* child inode->i_alloc_sem protects orphan_dec_test and
+                 * is_orphan race, mds_mfd_close drops it */
+                DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode);
                 rc = mds_mfd_close(NULL, 0, obd, mfd,
                                    !(export->exp_flags & OBD_OPT_FAILOVER));
-
                 if (rc)
                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
                 spin_lock(&med->med_open_lock);
@@ -787,7 +792,7 @@ static int mds_getattr_pack_msg_cf(struct ptlrpc_request *req,
 
         rc = lustre_pack_reply(req, 1, size, NULL);
         if (rc) {
-                CERROR("out of memory\n");
+                CERROR("lustre_pack_reply failed: rc %d\n", rc);
                 GOTO(out, req->rq_status = rc);
         }
 
@@ -884,7 +889,7 @@ int mds_check_mds_num(struct obd_device *obd, struct inode* inode,
         RETURN(rc);
 }
 
-static int mds_getattr_name(struct ptlrpc_request *req, int offset,
+static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                             struct lustre_handle *child_lockh, int child_part)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
@@ -895,11 +900,10 @@ static int mds_getattr_name(struct ptlrpc_request *req, int offset,
         struct mds_body *body;
         struct dentry *dparent = NULL, *dchild = NULL;
         struct lvfs_ucred uc;
-        struct lustre_handle parent_lockh[2];
-        int namesize, update_mode;
-        int rc = 0, cleanup_phase = 0, resent_req = 0, reply_offset;
-        struct clonefs_info *clone_info = NULL;
-        char *name;
+        struct lustre_handle parent_lockh[2] = {{0}, {0}};
+        unsigned int namesize;
+        int rc = 0, cleanup_phase = 0, resent_req = 0, update_mode, reply_offset;
+        char *name = NULL;
         ENTRY;
 
         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
@@ -929,8 +933,13 @@ static int mds_getattr_name(struct ptlrpc_request *req, int offset,
         }
         namesize = req->rq_reqmsg->buflens[offset + 1];
 
+        /* namesize less than 2 means we have empty name, probably came from
+           revalidate by cfid, so no point in having name to be set */
+        if (namesize <= 1)
+                name = NULL;
+
         LASSERT (offset == 1 || offset == 3);
-        /* if requests were at offset 3, the getattr reply goes back at 1 */
+        /* if requests were at offset 2, the getattr reply goes back at 1 */
         if (offset == 3) {
                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
                 reply_offset = 1;
@@ -949,28 +958,11 @@ static int mds_getattr_name(struct ptlrpc_request *req, int offset,
         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
 
         LASSERT(namesize > 0);
-        if (namesize == 1) {
-                /* we have no dentry here, drop LOOKUP bit */
-                child_part &= ~MDS_INODELOCK_LOOKUP;
-                CDEBUG(D_OTHER, "%s: request to retrieve attrs for %lu/%lu\n",
-                       obd->obd_name, (unsigned long) body->fid1.id,
-                       (unsigned long) body->fid1.generation);
-                dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
-                                               parent_lockh, &update_mode, 
-                                               NULL, 0, child_part);
-                if (IS_ERR(dchild)) {
-                        CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
-                        GOTO(cleanup, rc = PTR_ERR(dchild));
-                }
-                memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
-#ifdef S_PDIROPS
-                if (parent_lockh[1].cookie)
-                        ldlm_lock_decref(parent_lockh + 1, update_mode);
-#endif
-                cleanup_phase = 2;
-                goto fill_inode;
+        if (child_lockh->cookie != 0) {
+                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+                resent_req = 1;
         }
-        
+#if 0        
 #if HAVE_LOOKUP_RAW
         /* FIXME: handle raw lookup */
         if (body->valid == OBD_MD_FLID) {
@@ -1001,19 +993,51 @@ static int mds_getattr_name(struct ptlrpc_request *req, int offset,
                 GOTO(cleanup, rc);
         }
 #endif
-
-        if (child_lockh->cookie != 0) {
-                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
-                resent_req = 1;
-        }
-
+#endif
         if (resent_req == 0) {
-                rc = mds_get_parent_child_locked(obd, mds, &body->fid1,
-                                                 parent_lockh, &dparent,
-                                                 LCK_PR, MDS_INODELOCK_LOOKUP,
-                                                 &update_mode, name, namesize,
-                                                 child_lockh, &dchild, LCK_PR,
-                                                 child_part, clone_info);
+                if (name) {
+                        rc = mds_get_parent_child_locked(obd, mds, &body->fid1,
+                                                         parent_lockh, &dparent,
+                                                         LCK_PR, 
+                                                         MDS_INODELOCK_LOOKUP,
+                                                         &update_mode, 
+                                                         name, namesize,
+                                                         child_lockh, &dchild, 
+                                                         LCK_PR, child_part);
+                } else {
+                        /* we have no dentry here, drop LOOKUP bit */
+                        /*FIXME: we need MDS_INODELOCK_LOOKUP or not*/
+                        child_part &= ~MDS_INODELOCK_LOOKUP;
+                        CDEBUG(D_OTHER, "%s: retrieve attrs for %lu/%lu\n",
+                               obd->obd_name, (unsigned long) body->fid1.id,
+                               (unsigned long) body->fid1.generation);
+
+#if 0
+                        dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, 
+                                                       LCK_PR, parent_lockh, 
+                                                       &update_mode,
+                                                       NULL, 0, child_part);
+#else
+                        dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL,
+                                                       LCK_PR, parent_lockh,
+                                                       &update_mode,
+                                                       NULL, 0, 
+                                                       MDS_INODELOCK_UPDATE);
+#endif 
+                        if (IS_ERR(dchild)) {
+                                CERROR("can't find inode: %d\n", 
+                                       (int) PTR_ERR(dchild));
+                                GOTO(cleanup, rc = PTR_ERR(dchild));
+                        }
+                        memcpy(child_lockh, parent_lockh, 
+                               sizeof(parent_lockh[0]));
+#ifdef S_PDIROPS
+                        if (parent_lockh[1].cookie)
+                                ldlm_lock_decref(parent_lockh + 1, update_mode);
+#endif
+                        cleanup_phase = 2;
+                        goto fill_inode;
+                }
                 if (rc)
                         GOTO(cleanup, rc);
                 
@@ -1027,7 +1051,10 @@ static int mds_getattr_name(struct ptlrpc_request *req, int offset,
                 struct ldlm_resource *res;
                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
                 granted_lock = ldlm_handle2lock(child_lockh);
-                LASSERT(granted_lock);
+
+                LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
+                         body->fid1.id, body->fid1.generation,
+                         child_lockh->cookie);
 
                 res = granted_lock->l_resource;
                 child_fid.id = res->lr_name.name[0];
@@ -1071,14 +1098,13 @@ fill_inode:
                 if (resent_req == 0) {
                         if (rc && DENTRY_VALID(dchild))
                                 ldlm_lock_decref(child_lockh, LCK_PR);
-                        if (dparent) {
+                        if (name) {
                                 ldlm_lock_decref(parent_lockh, LCK_PR);
+                        }
 #ifdef S_PDIROPS
-                                if (parent_lockh[1].cookie != 0)
-                                        ldlm_lock_decref(parent_lockh + 1,
-                                                         update_mode);
+                        if (parent_lockh[1].cookie != 0)
+                                ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
-                        }
                         if (dparent)
                                 l_dput(dparent);
                 }
@@ -1127,14 +1153,14 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
         de = mds_fid2dentry(mds, &body->fid1, NULL);
         if (IS_ERR(de)) {
-                rc = req->rq_status = -ENOENT;
-                GOTO(out_pop, PTR_ERR(de));
+                rc = req->rq_status = PTR_ERR(de);
+                GOTO(out_pop, rc);
         }
 
         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
         if (rc != 0) {
-                CERROR ("mds_getattr_pack_msg: %d\n", rc);
-                GOTO (out_pop, rc);
+                CERROR("mds_getattr_pack_msg: %d\n", rc);
+                GOTO(out_pop, rc);
         }
 
         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
@@ -1761,7 +1787,7 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_STATFS:
         case MDS_GETSTATUS:
         case MDS_GETATTR:
-        case MDS_GETATTR_NAME:
+        case MDS_GETATTR_LOCK:
         case MDS_READPAGE:
         case MDS_REINT:
         case MDS_CLOSE:
@@ -1812,6 +1838,7 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         return rc;
 }
 
+static char str[PTL_NALFMT_SIZE];
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
@@ -1835,8 +1862,9 @@ int mds_handle(struct ptlrpc_request *req)
                 int recovering;
 
                 if (req->rq_export == NULL) {
-                        CERROR("lustre_mds: operation %d on unconnected MDS\n",
-                               req->rq_reqmsg->opc);
+                        CERROR("operation %d on unconnected MDS from NID %s\n",
+                               req->rq_reqmsg->opc,
+                               ptlrpc_peernid2str(&req->rq_peer, str));
                         req->rq_status = -ENOTCONN;
                         GOTO(out, rc = -ENOTCONN);
                 }
@@ -1847,12 +1875,13 @@ int mds_handle(struct ptlrpc_request *req)
 
                 /* sanity check: if the xid matches, the request must
                  * be marked as a resent or replayed */
-                if (req->rq_xid == med->med_mcd->mcd_last_xid)
+                if (req->rq_xid == med->med_mcd->mcd_last_xid) {
                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
                                  (MSG_RESENT | MSG_REPLAY),
                                  "rq_xid "LPU64" matches last_xid, "
                                  "expected RESENT flag\n",
                                  req->rq_xid);
+                }
                 /* else: note the opposite is not always true; a
                  * RESENT req after a failover will usually not match
                  * the last_xid, since it was likely never
@@ -1905,17 +1934,17 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = mds_getattr(req, MDS_REQ_REC_OFF);
                 break;
 
-        case MDS_GETATTR_NAME: {
+        case MDS_GETATTR_LOCK: {
                 struct lustre_handle lockh;
                 DEBUG_REQ(D_INODE, req, "getattr_name");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_LOCK_NET, 0);
 
                 /* If this request gets a reconstructed reply, we won't be
-                 * acquiring any new locks in mds_getattr_name, so we don't
+                 * acquiring any new locks in mds_getattr_lock, so we don't
                  * want to cancel.
                  */
                 lockh.cookie = 0;
-                rc = mds_getattr_name(req, MDS_REQ_REC_OFF, &lockh,
+                rc = mds_getattr_lock(req, MDS_REQ_REC_OFF, &lockh,
                                       MDS_INODELOCK_UPDATE);
                 /* this non-intent call (from an ioctl) is special */
                 req->rq_status = rc;
@@ -2025,6 +2054,7 @@ int mds_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
                                          ldlm_server_blocking_ast, NULL);
+                fail = OBD_FAIL_LDLM_REPLY;
                 break;
         case LDLM_CONVERT:
                 DEBUG_REQ(D_INODE, req, "convert");
@@ -2170,6 +2200,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         struct mds_obd *mds = &obd->u.mds;
         char *options = NULL;
         struct vfsmount *mnt;
+        char ns_name[48];
         unsigned long page;
         int rc = 0;
         ENTRY;
@@ -2260,8 +2291,9 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
         atomic_set(&mds->mds_real_clients, 0);
 
-        obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
-                                                LDLM_NAMESPACE_SERVER);
+        sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
+        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+
         if (obd->obd_namespace == NULL) {
                 mds_cleanup(obd, 0);
                 GOTO(err_put, rc = -ENOMEM);
@@ -2270,7 +2302,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
 
         rc = mds_fs_setup(obd, mnt);
         if (rc) {
-                CERROR("MDS filesystem method init failed: rc = %d\n", rc);
+                CERROR("%s: MDS filesystem method init failed: rc = %d\n",
+                       obd->obd_name, rc);
                 GOTO(err_ns, rc);
         }
 
@@ -2384,7 +2417,8 @@ int mds_postrecov(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct llog_ctxt *ctxt;
-        int rc, item = 0;
+        int rc, item = 0, valsize;
+         __u32 group;
         ENTRY;
 
         LASSERT(!obd->obd_recovering);
@@ -2404,6 +2438,13 @@ int mds_postrecov(struct obd_device *obd)
                 GOTO(out, rc);
         item = rc;
 
+        group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+        valsize = sizeof(group);
+        rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
+                          valsize, &group);
+        if (rc)
+                GOTO(out, rc);
+
         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
                           NULL, NULL, NULL);
         if (rc) {
@@ -2531,6 +2572,7 @@ static int mds_cleanup(struct obd_device *obd, int flags)
 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
                                         int offset,
                                         struct ldlm_lock *new_lock,
+                                        struct ldlm_lock **old_lock,
                                         struct lustre_handle *lockh)
 {
         struct obd_export *exp = req->rq_export;
@@ -2551,8 +2593,11 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
                         continue;
                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
                         lockh->cookie = lock->l_handle.h_cookie;
+                        LDLM_DEBUG(lock, "restoring lock cookie");
                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
                                   lockh->cookie);
+                        if (old_lock)
+                                *old_lock = LDLM_LOCK_GET(lock);
                         l_unlock(&obd->obd_namespace->ns_lock);
                         return;
                 }
@@ -2599,7 +2644,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
         struct ldlm_reply *rep;
         struct lustre_handle lockh[2] = {{0}, {0}};
-        struct ldlm_lock *new_lock;
+        struct ldlm_lock *new_lock = NULL;
         int getattr_part = MDS_INODELOCK_UPDATE;
         int rc, repsize[4] = { sizeof(struct ldlm_reply),
                                sizeof(struct mds_body),
@@ -2634,8 +2679,6 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
         intent_set_disposition(rep, DISP_IT_EXECD);
 
-        fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
-                                    lock, lockh);
 
         /* execute policy */
         switch ((long)it->opc) {
@@ -2643,6 +2686,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         case IT_CREAT|IT_OPEN:
                 /* XXX swab here to assert that an mds_open reint
                  * packet is following */
+                fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, 
+                                            lock, NULL, lockh);
                 rep->lock_policy_res2 = mds_reint(req, offset, lockh);
 #if 0
                 /* We abort the lock if the lookup was negative and
@@ -2663,7 +2708,9 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         case IT_GETATTR:
                 getattr_part |= MDS_INODELOCK_LOOKUP;
         case IT_READDIR:
-                rep->lock_policy_res2 = mds_getattr_name(req, offset, lockh,
+                fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, 
+                                            lock, &new_lock, lockh);
+                rep->lock_policy_res2 = mds_getattr_lock(req, offset, lockh,
                                                          getattr_part);
                 /* FIXME: LDLM can set req->rq_status. MDS sets
                    policy_res{1,2} with disposition and status.
@@ -2699,11 +2746,14 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
          * drop it below anyways because lock replay is done separately by the
          * client afterwards.  For regular RPCs we want to give the new lock to
          * the client instead of whatever lock it was about to get. */
-        new_lock = ldlm_handle2lock(&lockh[0]);
+        if (new_lock == NULL)
+                new_lock = ldlm_handle2lock(&lockh[0]);
         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
                 RETURN(0);
 
-        LASSERT(new_lock != NULL);
+        LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
+                 it->opc, lockh[0].cookie);
 
         /* If we've already given this lock to a client once, then we should
          * have no readers or writers.  Otherwise, we should have one reader