Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / mds / handler.c
index 58cfa20..259a6bc 100644 (file)
 #include <linux/init.h>
 #include <linux/obd_class.h>
 #include <linux/random.h>
+#include <linux/fs.h>
+#include <linux/jbd.h>
+#include <linux/ext3_fs.h>
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
-#include <linux/smp_lock.h>
-#include <linux/buffer_head.h>
-#include <linux/workqueue.h>
-#include <linux/mount.h>
-#else 
-#include <linux/locks.h>
+# include <linux/smp_lock.h>
+# include <linux/buffer_head.h>
+# include <linux/workqueue.h>
+# include <linux/mount.h>
+#else
+# include <linux/locks.h>
 #endif
 #include <linux/obd_lov.h>
 #include <linux/lustre_mds.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
-
-kmem_cache_t *mds_file_cache;
+#include "mds_internal.h"
 
 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
                            struct obd_uuid *uuidarray);
 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
                        struct ptlrpc_request *req, int rc, int disp);
-static int mds_cleanup(struct obd_device * obddev);
+static int mds_cleanup(struct obd_device * obddev, int force, int failover);
 
 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 {
@@ -65,9 +67,13 @@ inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 static int mds_bulk_timeout(void *data)
 {
         struct ptlrpc_bulk_desc *desc = data;
+        struct obd_export *exp = desc->bd_export;
 
-        ENTRY;
-        recovd_conn_fail(desc->bd_connection);
+        CERROR("bulk send timed out: evicting %s@%s\n",
+               exp->exp_client_uuid.uuid,
+               exp->exp_connection->c_remote_uuid.uuid);
+        ptlrpc_fail_export(exp);
+        ptlrpc_abort_bulk (desc);
         RETURN(1);
 }
 
@@ -76,39 +82,35 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                         __u64 offset, __u64 xid)
 {
         struct ptlrpc_bulk_desc *desc;
-        struct ptlrpc_bulk_page *bulk;
         struct l_wait_info lwi;
-        char *buf;
+        struct page *page;
         int rc = 0;
         ENTRY;
 
-        desc = ptlrpc_prep_bulk(req->rq_connection);
+        LASSERT ((offset & (PAGE_CACHE_SIZE - 1)) == 0);
+
+        desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, MDS_BULK_PORTAL);
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        bulk = ptlrpc_prep_bulk_page(desc);
-        if (bulk == NULL)
+        LASSERT (PAGE_SIZE == PAGE_CACHE_SIZE);
+        page = alloc_pages (GFP_KERNEL, 0);
+        if (page == NULL)
                 GOTO(cleanup_bulk, rc = -ENOMEM);
 
-        OBD_ALLOC(buf, PAGE_CACHE_SIZE);
-        if (buf == NULL)
-                GOTO(cleanup_bulk, rc = -ENOMEM);
+        rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
+        if (rc != 0)
+                GOTO(cleanup_buf, rc);
 
         CDEBUG(D_EXT2, "reading %lu@"LPU64" from dir %lu (size %llu)\n",
                PAGE_CACHE_SIZE, offset, file->f_dentry->d_inode->i_ino,
                file->f_dentry->d_inode->i_size);
-        rc = fsfilt_readpage(req->rq_export->exp_obd, file, buf,
+        rc = fsfilt_readpage(req->rq_export->exp_obd, file, page_address (page),
                              PAGE_CACHE_SIZE, (loff_t *)&offset);
 
         if (rc != PAGE_CACHE_SIZE)
                 GOTO(cleanup_buf, rc = -EIO);
 
-        bulk->bp_xid = xid;
-        bulk->bp_buf = buf;
-        bulk->bp_buflen = PAGE_CACHE_SIZE;
-        desc->bd_ptl_ev_hdlr = NULL;
-        desc->bd_portal = MDS_BULK_PORTAL;
-
         rc = ptlrpc_bulk_put(desc);
         if (rc)
                 GOTO(cleanup_buf, rc);
@@ -121,19 +123,17 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         }
 
         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
-        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
-                          &lwi);
+        rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete (desc), &lwi);
         if (rc) {
-                if (rc != -ETIMEDOUT)
-                        LBUG();
+                LASSERT (rc == -ETIMEDOUT);
                 GOTO(cleanup_buf, rc);
         }
 
         EXIT;
  cleanup_buf:
-        OBD_FREE(buf, PAGE_SIZE);
+        __free_pages (page, 0);
  cleanup_bulk:
-        ptlrpc_bulk_decref(desc);
+        ptlrpc_free_bulk (desc);
  out:
         return rc;
 }
@@ -157,7 +157,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
                               &flags, ldlm_completion_ast,
-                              mds_blocking_ast, NULL, NULL, lockh);
+                              mds_blocking_ast, NULL, lockh);
         if (rc != ELDLM_OK) {
                 l_dput(de);
                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
@@ -171,67 +171,52 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
 #endif
 
 
-
 /* Look up an entry by inode number. */
 /* this function ONLY returns valid dget'd dentries with an initialized inode
    or errors */
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt)
 {
-        /* stolen from NFS */
-        struct super_block *sb = mds->mds_sb;
+        char fid_name[32];
         unsigned long ino = fid->id;
         __u32 generation = fid->generation;
         struct inode *inode;
-        struct list_head *lp;
         struct dentry *result;
 
         if (ino == 0)
                 RETURN(ERR_PTR(-ESTALE));
 
-        inode = iget(sb, ino);
-        if (inode == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
+        snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
 
-        CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
+        /* under ext3 this is neither supposed to return bad inodes
+           nor NULL inodes. */
+        result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
+        if (IS_ERR(result))
+                RETURN(result);
 
-        if (is_bad_inode(inode) ||
-            (generation && inode->i_generation != generation)) {
+        inode = result->d_inode;
+        if (!inode)
+                RETURN(ERR_PTR(-ENOENT));
+
+        CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino %lu, gen %u, sb %p\n",
+               inode->i_ino, inode->i_generation, inode->i_sb);
+
+        if (generation && inode->i_generation != generation) {
                 /* we didn't find the right inode.. */
-                CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
+                CERROR("bad inode %lu, link: %d ct: %d or generation %u/%u\n",
                        inode->i_ino, inode->i_nlink,
                        atomic_read(&inode->i_count), inode->i_generation,
                        generation);
-                iput(inode);
+                dput(result);
                 RETURN(ERR_PTR(-ENOENT));
         }
 
-        /* now to find a dentry. If possible, get a well-connected one */
-        if (mnt)
+        if (mnt) {
                 *mnt = mds->mds_vfsmnt;
-        spin_lock(&dcache_lock);
-        list_for_each(lp, &inode->i_dentry) {
-                result = list_entry(lp, struct dentry, d_alias);
-                if (!(result->d_flags & DCACHE_DISCONNECTED)) {
-                        dget_locked(result);
-                        result->d_vfs_flags |= DCACHE_REFERENCED;
-                        spin_unlock(&dcache_lock);
-                        iput(inode);
-                        if (mnt)
-                                mntget(*mnt);
-                        return result;
-                }
-        }
-        spin_unlock(&dcache_lock);
-        result = d_alloc_root(inode);
-        if (result == NULL) {
-                iput(inode);
-                return ERR_PTR(-ENOMEM);
-        }
-        if (mnt)
                 mntget(*mnt);
-        result->d_flags |= DCACHE_DISCONNECTED;
-        return result;
+        }
+
+        RETURN(result);
 }
 
 
@@ -242,13 +227,12 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
  * on the server, etc.
  */
 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
-                       struct obd_uuid *cluuid, struct recovd_obd *recovd,
-                       ptlrpc_recovery_cb_t recover)
+                       struct obd_uuid *cluuid)
 {
         struct obd_export *exp;
         struct mds_export_data *med;
         struct mds_client_data *mcd;
-        int rc;
+        int rc, abort_recovery;
         ENTRY;
 
         if (!conn || !obd || !cluuid)
@@ -256,9 +240,10 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         /* Check for aborted recovery. */
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_flags & OBD_ABORT_RECOVERY)
-                target_abort_recovery(obd);
+        abort_recovery = obd->obd_abort_recovery;
         spin_unlock_bh(&obd->obd_processing_task_lock);
+        if (abort_recovery)
+                target_abort_recovery(obd);
 
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
@@ -276,6 +261,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         exp = class_conn2export(conn);
         LASSERT(exp);
         med = &exp->exp_mds_data;
+        class_export_put(exp);
 
         OBD_ALLOC(mcd, sizeof(*mcd));
         if (!mcd) {
@@ -289,7 +275,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         INIT_LIST_HEAD(&med->med_open_head);
         spin_lock_init(&med->med_open_lock);
 
-        rc = mds_client_add(&obd->u.mds, med, -1);
+        rc = mds_client_add(obd, &obd->u.mds, med, -1);
         if (rc)
                 GOTO(out_mcd, rc);
 
@@ -298,42 +284,116 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
 out_mcd:
         OBD_FREE(mcd, sizeof(*mcd));
 out_export:
-        class_disconnect(conn);
+        class_disconnect(conn, 0);
 
         return rc;
 }
 
+static void mds_mfd_addref(void *mfdp)
+{
+        struct mds_file_data *mfd = mfdp;
+
+        atomic_inc(&mfd->mfd_refcount);
+        CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
+               atomic_read(&mfd->mfd_refcount));
+}
+
+struct mds_file_data *mds_mfd_new(void)
+{
+        struct mds_file_data *mfd;
+
+        OBD_ALLOC(mfd, sizeof *mfd);
+        if (mfd == NULL) {
+                CERROR("mds: out of memory\n");
+                return NULL;
+        }
+
+        atomic_set(&mfd->mfd_refcount, 2);
+
+        INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+        class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
+
+        return mfd;
+}
+
+static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
+{
+        ENTRY;
+        LASSERT(handle != NULL);
+        RETURN(class_handle2object(handle->cookie));
+}
+
+void mds_mfd_put(struct mds_file_data *mfd)
+{
+        CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
+               atomic_read(&mfd->mfd_refcount) - 1);
+        LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
+                atomic_read(&mfd->mfd_refcount) < 0x5a5a);
+        if (atomic_dec_and_test(&mfd->mfd_refcount)) {
+                LASSERT(list_empty(&mfd->mfd_handle.h_link));
+                OBD_FREE(mfd, sizeof *mfd);
+        }
+}
+
+void mds_mfd_destroy(struct mds_file_data *mfd)
+{
+        class_handle_unhash(&mfd->mfd_handle);
+        mds_mfd_put(mfd);
+}
+
 /* Call with med->med_open_lock held, please. */
-inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
+static int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
 {
-        struct file *file = mfd->mfd_file;
-        int rc;
         struct dentry *de = NULL;
-        LASSERT(file->private_data == mfd);
-
-        LASSERT(mfd->mfd_servercookie != DEAD_HANDLE_MAGIC);
 
+#ifdef CONFIG_SMP
+        LASSERT(spin_is_locked(&med->med_open_lock));
+#endif
         list_del(&mfd->mfd_list);
-        mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
-        kmem_cache_free(mds_file_cache, mfd);
 
-        if (file->f_dentry->d_parent) {
-                LASSERT(atomic_read(&file->f_dentry->d_parent->d_count));
-                de = dget(file->f_dentry->d_parent);
+        if (mfd->mfd_dentry->d_parent) {
+                LASSERT(atomic_read(&mfd->mfd_dentry->d_parent->d_count));
+                de = dget(mfd->mfd_dentry->d_parent);
         }
-        rc = filp_close(file, 0);
+
+        /* this is the actual "close" */
+        l_dput(mfd->mfd_dentry);
+
         if (de)
                 l_dput(de);
-        RETURN(rc);
+
+        mds_mfd_destroy(mfd);
+        RETURN(0);
 }
 
-static int mds_disconnect(struct lustre_handle *conn)
+static int mds_disconnect(struct lustre_handle *conn, int failover)
 {
         struct obd_export *export = class_conn2export(conn);
-        struct list_head *tmp, *n;
+        int rc;
+        unsigned long flags;
+        ENTRY;
+
+        ldlm_cancel_locks_for_export(export);
+
+        spin_lock_irqsave(&export->exp_lock, flags);
+        export->exp_failover = failover;
+        spin_unlock_irqrestore(&export->exp_lock, flags);
+
+        rc = class_disconnect(conn, failover);
+        class_export_put(export);
+
+        RETURN(rc);
+}
+
+static void mds_destroy_export(struct obd_export *export)
+{
         struct mds_export_data *med = &export->exp_mds_data;
+        struct list_head *tmp, *n;
         int rc;
+
         ENTRY;
+        LASSERT(!strcmp(export->exp_obd->obd_type->typ_name,
+                        LUSTRE_MDS_NAME));
 
         /*
          * Close any open files.
@@ -342,28 +402,39 @@ static int mds_disconnect(struct lustre_handle *conn)
         list_for_each_safe(tmp, n, &med->med_open_head) {
                 struct mds_file_data *mfd =
                         list_entry(tmp, struct mds_file_data, mfd_list);
-                CERROR("force closing client file handle for %*s\n",
-                       mfd->mfd_file->f_dentry->d_name.len,
-                       mfd->mfd_file->f_dentry->d_name.name);
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+                struct dentry *dentry = mfd->mfd_dentry;
+                CERROR("force closing client file handle for %*s (%s:%lu)\n",
+                       dentry->d_name.len, dentry->d_name.name,
+                       kdevname(dentry->d_inode->i_sb->s_dev),
+                       dentry->d_inode->i_ino);
+#endif
                 rc = mds_close_mfd(mfd, med);
                 if (rc)
                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
         }
         spin_unlock(&med->med_open_lock);
 
-        ldlm_cancel_locks_for_export(export);
-        if (med->med_outstanding_reply) {
+        if (export->exp_outstanding_reply) {
+                struct ptlrpc_request *req = export->exp_outstanding_reply;
+                unsigned long          flags;
+
                 /* Fake the ack, so the locks get cancelled. */
-                med->med_outstanding_reply->rq_flags &= ~PTL_RPC_FL_WANT_ACK;
-                med->med_outstanding_reply->rq_flags |= PTL_RPC_FL_ERR;
-                wake_up(&med->med_outstanding_reply->rq_wait_for_rep);
-                med->med_outstanding_reply = NULL;
-        }
-        mds_client_free(export);
+                LBUG ();
+                /* Actually we can't do this because it prevents us knowing
+                 * if the ACK callback ran or not */
+                spin_lock_irqsave (&req->rq_lock, flags);
+                req->rq_want_ack = 0;
+                req->rq_err = 1;
+                wake_up(&req->rq_wait_for_rep);
+                spin_unlock_irqrestore (&req->rq_lock, flags);
 
-        rc = class_disconnect(conn);
+                export->exp_outstanding_reply = NULL;
+        }
 
-        RETURN(rc);
+        if (!export->exp_failover)
+                mds_client_free(export);
+        EXIT;
 }
 
 /*
@@ -393,7 +464,7 @@ static int mds_getstatus(struct ptlrpc_request *req)
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
                 CERROR("mds: out of memory for message: size=%d\n", size);
-                req->rq_status = -ENOMEM;
+                req->rq_status = -ENOMEM;       /* superfluous? */
                 RETURN(-ENOMEM);
         }
 
@@ -404,7 +475,7 @@ static int mds_getstatus(struct ptlrpc_request *req)
          */
         mds_fsync_super(mds->mds_sb);
 
-        body = lustre_msg_buf(req->rq_repmsg, 0);
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
 
         /* the last_committed and last_xid fields are filled in for all
@@ -418,19 +489,28 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
         struct mds_obd *mds = mds_req2mds(req);
         struct mds_status_req *streq;
         struct lov_desc *desc;
+        struct obd_uuid *uuid0;
         int tgt_count;
         int rc, size[2] = {sizeof(*desc)};
         ENTRY;
 
-        streq = lustre_msg_buf(req->rq_reqmsg, 0);
-        streq->flags = NTOH__u32(streq->flags);
-        streq->repbuf = NTOH__u32(streq->repbuf);
+        streq = lustre_swab_reqbuf (req, 0, sizeof (*streq),
+                                    lustre_swab_mds_status_req);
+        if (streq == NULL) {
+                CERROR ("Can't unpack mds_status_req\n");
+                RETURN (-EFAULT);
+        }
+
+        if (streq->repbuf > LOV_MAX_UUID_BUFFER_SIZE) {
+                CERROR ("Illegal request for uuid array > %d\n",
+                        streq->repbuf);
+                RETURN (-EINVAL);
+        }
         size[1] = streq->repbuf;
 
         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
-                req->rq_status = -ENOMEM;
                 RETURN(-ENOMEM);
         }
 
@@ -439,18 +519,21 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        desc = lustre_msg_buf(req->rq_repmsg, 0);
-        memcpy(desc, &mds->mds_lov_desc, sizeof *desc);
-        lov_packdesc(desc);
-        tgt_count = le32_to_cpu(desc->ld_tgt_count);
-        if (tgt_count * sizeof(struct obd_uuid) > streq->repbuf) {
+        /* XXX We're sending the lov_desc in my byte order.
+         * Receiver will swab... */
+        desc = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*desc));
+        memcpy(desc, &mds->mds_lov_desc, sizeof (*desc));
+
+        tgt_count = mds->mds_lov_desc.ld_tgt_count;
+        uuid0 = lustre_msg_buf (req->rq_repmsg, 1,
+                                tgt_count * sizeof (*uuid0));
+        if (uuid0 == NULL) {
                 CERROR("too many targets, enlarge client buffers\n");
                 req->rq_status = -ENOSPC;
                 RETURN(0);
         }
 
-        rc = mds_get_lovtgts(mds, tgt_count,
-                             lustre_msg_buf(req->rq_repmsg, 1));
+        rc = mds_get_lovtgts(mds, tgt_count, uuid0);
         if (rc) {
                 CERROR("get_lovtgts error %d\n", rc);
                 req->rq_status = rc;
@@ -507,17 +590,19 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lov_mds_md *lmm;
-        int lmm_size = msg->buflens[offset];
+        int lmm_size;
         int rc;
         ENTRY;
 
-        if (lmm_size == 0) {
+        lmm = lustre_msg_buf(msg, offset, 0);
+        if (lmm == NULL) {
+                /* Some problem with getting eadata when I sized the reply
+                 * buffer... */
                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
                        inode->i_ino);
                 RETURN(0);
         }
-
-        lmm = lustre_msg_buf(msg, offset);
+        lmm_size = msg->buflens[offset];
 
         /* I don't really like this, but it is a sanity check on the client
          * MD request.  However, if the client doesn't know how much space
@@ -529,15 +614,13 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
                 // RETURN(-EINVAL);
         }
 
-        /* We don't need to store the reply size, because this buffer is
-         * discarded right after unpacking, and the LOV can figure out the
-         * size itself from the ost count.
-         */
-        if ((rc = fsfilt_get_md(obd, inode, lmm, lmm_size)) < 0) {
-                CDEBUG(D_INFO, "No md for ino %lu: rc = %d\n",
-                       inode->i_ino, rc);
+        rc = fsfilt_get_md(obd, inode, lmm, lmm_size);
+        if (rc < 0) {
+                CERROR ("Error %d reading eadata for ino %lu\n",
+                        rc, inode->i_ino);
         } else if (rc > 0) {
                 body->valid |= OBD_MD_FLEASIZE;
+                body->eadatasize = rc;
                 rc = 0;
         }
 
@@ -556,24 +639,36 @@ static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
         if (inode == NULL)
                 RETURN(-ENOENT);
 
-        body = lustre_msg_buf(req->rq_repmsg, reply_off);
+        body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof (*body));
+        LASSERT (body != NULL);                 /* caller prepped reply */
 
         mds_pack_inode2fid(&body->fid1, inode);
         mds_pack_inode2body(body, inode);
 
-        if (S_ISREG(inode->i_mode) && reqbody->valid & OBD_MD_FLEASIZE) {
+        if (S_ISREG(inode->i_mode) &&
+            (reqbody->valid & OBD_MD_FLEASIZE) != 0) {
                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1,
                                  body, inode);
-        } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) {
-                char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
-                int len = req->rq_repmsg->buflens[reply_off + 1];
+        } else if (S_ISLNK(inode->i_mode) &&
+                   (reqbody->valid & OBD_MD_LINKNAME) != 0) {
+                char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1, 0);
+                int len;
+
+                LASSERT (symname != NULL);       /* caller prepped reply */
+                len = req->rq_repmsg->buflens[reply_off + 1];
 
                 rc = inode->i_op->readlink(dentry, symname, len);
                 if (rc < 0) {
                         CERROR("readlink failed: %d\n", rc);
+                } else if (rc != len - 1) {
+                        CERROR ("Unexpected readlink rc %d: expecting %d\n",
+                                rc, len - 1);
+                        rc = -EINVAL;
                 } else {
                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
                         body->valid |= OBD_MD_LINKNAME;
+                        body->eadatasize = rc + 1;
+                        symname[rc] = 0;        /* NULL terminate */
                         rc = 0;
                 }
         }
@@ -588,9 +683,12 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_reqmsg, offset);
+        body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
+        LASSERT (body != NULL);                 /* checked by caller */
+        LASSERT_REQSWABBED (req, offset);       /* swabbed by caller */
 
-        if (S_ISREG(inode->i_mode) && body->valid & OBD_MD_FLEASIZE) {
+        if (S_ISREG(inode->i_mode) &&
+            (body->valid & OBD_MD_FLEASIZE) != 0) {
                 int rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
                        rc, inode->i_ino);
@@ -606,11 +704,15 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
                 } else
                         size[bufcount] = rc;
                 bufcount++;
-        } else if (body->valid & OBD_MD_LINKNAME) {
-                size[bufcount] = MIN(inode->i_size + 1, body->size);
+        } else if (S_ISLNK (inode->i_mode) &&
+                   (body->valid & OBD_MD_LINKNAME) != 0) {
+                if (inode->i_size + 1 != body->eadatasize)
+                        CERROR ("symlink size: %Lu, reply space: %d\n",
+                                inode->i_size + 1, body->eadatasize);
+                size[bufcount] = MIN(inode->i_size + 1, body->eadatasize);
                 bufcount++;
-                CDEBUG(D_INODE, "symlink size: %Lu, reply space: "LPU64"\n",
-                       inode->i_size + 1, body->size);
+                CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
+                       inode->i_size + 1, body->eadatasize);
         }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
@@ -636,8 +738,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
 static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
                                      struct lustre_handle *client_lockh)
 {
-        struct mds_export_data *med = &req->rq_export->exp_mds_data;
-        struct mds_client_data *mcd = med->med_mcd;
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = mds_req2mds(req);
         struct dentry *parent, *child;
@@ -648,18 +748,19 @@ static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
         int namelen, rc = 0;
         char *name;
 
-        req->rq_transno = mcd->mcd_last_transno;
-        req->rq_status = mcd->mcd_last_result;
-
-        if (med->med_outstanding_reply)
-                mds_steal_ack_locks(med, req);
+        if (req->rq_export->exp_outstanding_reply)
+                mds_steal_ack_locks(req->rq_export, req);
 
-        if (req->rq_status)
-                return;
+        body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
+        LASSERT (body != NULL);                 /* checked by caller */
+        LASSERT_REQSWABBED (req, offset);       /* swabbed by caller */
 
-        body = lustre_msg_buf(req->rq_reqmsg, offset);
-        name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
+        name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
+        LASSERT (name != NULL);                 /* checked by caller */
+        LASSERT_REQSWABBED (req, offset + 1);   /* swabbed by caller */
         namelen = req->rq_reqmsg->buflens[offset + 1];
+
+        LASSERT (offset == 2 || offset == 0);
         /* requests were at offset 2, replies go back at 1 */
         if (offset)
                 offset = 1;
@@ -674,19 +775,17 @@ static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
         LASSERT(!IS_ERR(parent));
         dir = parent->d_inode;
         LASSERT(dir);
-        child = lookup_one_len(name, parent, namelen - 1);
+        child = ll_lookup_one_len(name, parent, namelen - 1);
         LASSERT(!IS_ERR(child));
 
-        if (!med->med_outstanding_reply) {
-                /* XXX need to enqueue client lock */
-                LBUG();
+        if (req->rq_repmsg == NULL) {
+                rc = mds_getattr_pack_msg(req, child->d_inode, offset);
+                /* XXX need to handle error here */
+                LASSERT (rc == 0);
         }
 
-        if (req->rq_repmsg == NULL)
-                mds_getattr_pack_msg(req, child->d_inode, offset);
-        
         rc = mds_getattr_internal(obd, child, req, body, offset);
-        LASSERT(!rc);
+        req->rq_status = rc;
         l_dput(child);
         l_dput(parent);
 }
@@ -703,24 +802,41 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         struct obd_ucred uc;
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct lustre_handle parent_lockh;
-        int namelen, flags = 0, rc = 0, cleanup_phase = 0;
+        int namesize;
+        int flags = 0, rc = 0, cleanup_phase = 0, req_was_resent;
         char *name;
         ENTRY;
 
         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
 
-        MDS_CHECK_RESENT(req, 
-                         reconstruct_getattr_name(offset, req, child_lockh));
+        /* Swab now, before anyone looks inside the request */
 
-        if (req->rq_reqmsg->bufcount <= offset + 1) {
-                LBUG();
-                GOTO(cleanup, rc = -EINVAL);
+        body = lustre_swab_reqbuf (req, offset, sizeof (*body),
+                                   lustre_swab_mds_body);
+        if (body == NULL) {
+                CERROR ("Can't swab mds_body\n");
+                GOTO (cleanup, rc = -EFAULT);
         }
 
-        body = lustre_msg_buf(req->rq_reqmsg, offset);
-        name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
-        namelen = req->rq_reqmsg->buflens[offset + 1];
-        /* requests were at offset 2, replies go back at 1 */
+        LASSERT_REQSWAB (req, offset + 1);
+        name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        if (name == NULL) {
+                CERROR ("Can't unpack name\n");
+                GOTO (cleanup, rc = -EFAULT);
+        }
+        namesize = req->rq_reqmsg->buflens[offset + 1];
+
+        req_was_resent = lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
+        if (child_lockh->cookie) {
+                LASSERT(req_was_resent);
+                reconstruct_getattr_name(offset, req, child_lockh);
+                RETURN(0);
+        } else if (req_was_resent) {
+                DEBUG_REQ(D_HA, req, "no reply for RESENT req");
+        }
+
+        LASSERT (offset == 0 || offset == 2);
+        /* if requests were at offset 2, replies go back at 1 */
         if (offset)
                 offset = 1;
 
@@ -740,10 +856,10 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
 
         cleanup_phase = 1; /* parent dentry and lock */
 
-        CDEBUG(D_INODE, "parent ino %lu, name %*s\n", dir->i_ino,namelen,name);
+        CDEBUG(D_INODE, "parent ino %lu, name %s\n", dir->i_ino, name);
 
         /* Step 2: Lookup child */
-        dchild = lookup_one_len(name, de, namelen - 1);
+        dchild = ll_lookup_one_len(name, de, namesize - 1);
         if (IS_ERR(dchild)) {
                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
                 GOTO(cleanup, rc = PTR_ERR(dchild));
@@ -761,7 +877,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
                               &flags, ldlm_completion_ast, mds_blocking_ast,
-                              NULL, NULL, child_lockh);
+                              NULL, child_lockh);
         if (rc != ELDLM_OK) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
                 GOTO(cleanup, rc = -EIO);
@@ -769,15 +885,18 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
 
         cleanup_phase = 3; /* child lock */
 
-        if (req->rq_repmsg == NULL)
-                mds_getattr_pack_msg(req, dchild->d_inode, offset);
+        if (req->rq_repmsg == NULL) {
+                rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
+                if (rc != 0) {
+                        CERROR ("mds_getattr_pack_msg: %d\n", rc);
+                        GOTO (cleanup, rc);
+                }
+        }
 
         rc = mds_getattr_internal(obd, dchild, req, body, offset);
         GOTO(cleanup, rc); /* returns the lock to the client */
-        
+
  cleanup:
-        rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
-                                req, rc, 0);
         switch (cleanup_phase) {
         case 3:
                 if (rc)
@@ -812,7 +931,13 @@ static int mds_getattr(int offset, struct ptlrpc_request *req)
         int rc = 0;
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_reqmsg, offset);
+        body = lustre_swab_reqbuf (req, offset, sizeof (*body),
+                                   lustre_swab_mds_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack body\n");
+                RETURN (-EFAULT);
+        }
+
         uc.ouc_fsuid = body->fsuid;
         uc.ouc_fsgid = body->fsgid;
         uc.ouc_cap = body->capability;
@@ -824,6 +949,10 @@ static int mds_getattr(int offset, struct ptlrpc_request *req)
         }
 
         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
+        if (rc != 0) {
+                CERROR ("mds_getattr_pack_msg: %d\n", rc);
+                GOTO (out_pop, rc);
+        }
 
         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
 
@@ -847,13 +976,12 @@ static int mds_statfs(struct ptlrpc_request *req)
                 GOTO(out, rc);
         }
 
-        osfs = lustre_msg_buf(req->rq_repmsg, 0);
+        osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
         if (rc) {
                 CERROR("mds: statfs failed: rc %d\n", rc);
                 GOTO(out, rc);
         }
-        obd_statfs_pack(osfs, osfs);
 
         EXIT;
 out:
@@ -861,69 +989,6 @@ out:
         return 0;
 }
 
-static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
-{
-        struct mds_file_data *mfd = NULL;
-        ENTRY;
-
-        if (!handle || !handle->addr)
-                RETURN(NULL);
-
-        mfd = (struct mds_file_data *)(unsigned long)(handle->addr);
-        if (!kmem_cache_validate(mds_file_cache, mfd))
-                RETURN(NULL);
-
-        if (mfd->mfd_servercookie != handle->cookie)
-                RETURN(NULL);
-
-        RETURN(mfd);
-}
-
-#if 0
-
-static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
-                        int offset, struct mds_body *body, struct inode *inode)
-{
-        struct obd_device *obd = req->rq_export->exp_obd;
-        struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, offset);
-        int lmm_size = req->rq_reqmsg->buflens[offset];
-        struct obd_run_ctxt saved;
-        struct obd_ucred uc;
-        void *handle;
-        int rc, rc2;
-        ENTRY;
-
-        /* I don't really like this, but it is a sanity check on the client
-         * MD request.
-         */
-        if (lmm_size > mds->mds_max_mdsize) {
-                CERROR("Saving MD for inode %lu of %d bytes > max %d\n",
-                       inode->i_ino, lmm_size, mds->mds_max_mdsize);
-                //RETURN(-EINVAL);
-        }
-
-        CDEBUG(D_INODE, "storing %d bytes MD for inode %lu\n",
-               lmm_size, inode->i_ino);
-        uc.ouc_fsuid = body->fsuid;
-        uc.ouc_fsgid = body->fsgid;
-        uc.ouc_cap = body->capability;
-        push_ctxt(&saved, &mds->mds_ctxt, &uc);
-        handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
-        if (IS_ERR(handle)) {
-                rc = PTR_ERR(handle);
-                GOTO(out_ea, rc);
-        }
-
-        rc = fsfilt_set_md(obd, inode,handle,lmm,lmm_size);
-        rc = mds_finish_transno(mds, inode, handle, req, rc, 0);
-out_ea:
-        pop_ctxt(&saved, &mds->mds_ctxt, &uc);
-
-        RETURN(rc);
-}
-
-#endif
-
 static void reconstruct_close(struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
@@ -948,13 +1013,17 @@ static int mds_close(struct ptlrpc_request *req)
 
         MDS_CHECK_RESENT(req, reconstruct_close(req));
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_swab_reqbuf(req, 0, sizeof (*body),
+                                  lustre_swab_mds_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack body\n");
+                RETURN (-EFAULT);
+        }
 
         mfd = mds_handle2mfd(&body->handle);
         if (mfd == NULL) {
                 DEBUG_REQ(D_ERROR, req, "no handle for file close "LPD64
-                          ": addr "LPX64", cookie "LPX64"\n",
-                          body->fid1.id, body->handle.addr,
+                          ": cookie "LPX64"\n", body->fid1.id,
                           body->handle.cookie);
                 RETURN(-ESTALE);
         }
@@ -966,6 +1035,7 @@ static int mds_close(struct ptlrpc_request *req)
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
                 req->rq_status = -ENOMEM;
+                mds_mfd_put(mfd);
                 RETURN(-ENOMEM);
         }
 
@@ -975,6 +1045,7 @@ static int mds_close(struct ptlrpc_request *req)
                 req->rq_status = rc;
         }
 
+        mds_mfd_put(mfd);
         RETURN(0);
 }
 
@@ -986,7 +1057,7 @@ static int mds_readpage(struct ptlrpc_request *req)
         struct file *file;
         struct mds_body *body, *repbody;
         struct obd_run_ctxt saved;
-        int rc, size = sizeof(*body);
+        int rc, size = sizeof(*repbody);
         struct obd_ucred uc;
         ENTRY;
 
@@ -996,7 +1067,23 @@ static int mds_readpage(struct ptlrpc_request *req)
                 GOTO(out, rc = -ENOMEM);
         }
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
+                                   lustre_swab_mds_body);
+        if (body == NULL)
+                GOTO (out, rc = -EFAULT);
+
+        /* body->size is actually the offset -eeb */
+        if ((body->size & (PAGE_SIZE - 1)) != 0) {
+                CERROR ("offset "LPU64"not on a page boundary\n", body->size);
+                GOTO (out, rc = -EFAULT);
+        }
+
+        /* body->nlink is actually the #bytes to read -eeb */
+        if (body->nlink != PAGE_SIZE) {
+                CERROR ("size %d is not PAGE_SIZE\n", body->nlink);
+                GOTO (out, rc = -EFAULT);
+        }
+
         uc.ouc_fsuid = body->fsuid;
         uc.ouc_fsgid = body->fsgid;
         uc.ouc_cap = body->capability;
@@ -1012,7 +1099,7 @@ static int mds_readpage(struct ptlrpc_request *req)
         if (IS_ERR(file))
                 GOTO(out_pop, rc = PTR_ERR(file));
 
-        repbody = lustre_msg_buf(req->rq_repmsg, 0);
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
         repbody->size = file->f_dentry->d_inode->i_size;
         repbody->valid = OBD_MD_FLSIZE;
 
@@ -1020,6 +1107,7 @@ static int mds_readpage(struct ptlrpc_request *req)
            doesn't send a reply when this function completes. Instead a
            callback function would send the reply */
         /* body->blocks is actually the xid -phil */
+        /* body->size is actually the offset -eeb */
         rc = mds_sendpage(req, file, body->size, body->blocks);
 
         filp_close(file, 0);
@@ -1057,12 +1145,15 @@ static int filter_recovery_request(struct ptlrpc_request *req,
 {
         switch (req->rq_reqmsg->opc) {
         case MDS_CONNECT: /* This will never get here, but for completeness. */
+        case OST_CONNECT: /* This will never get here, but for completeness. */
         case MDS_DISCONNECT:
+        case OST_DISCONNECT:
                *process = 1;
                RETURN(0);
 
         case MDS_CLOSE:
         case MDS_GETSTATUS: /* used in unmounting */
+        case OBD_PING:
         case MDS_REINT:
         case LDLM_ENQUEUE:
                 *process = target_queue_recovery_request(req, obd);
@@ -1072,7 +1163,8 @@ static int filter_recovery_request(struct ptlrpc_request *req,
                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
                 *process = 0;
                 /* XXX what should we set rq_status to here? */
-                RETURN(ptlrpc_error(req->rq_svc, req));
+                req->rq_status = -EAGAIN;
+                RETURN(ptlrpc_error(req));
         }
 }
 
@@ -1085,106 +1177,42 @@ static char *reint_names[] = {
         [REINT_OPEN]    "open",
 };
 
-void mds_steal_ack_locks(struct mds_export_data *med,
+void mds_steal_ack_locks(struct obd_export *exp,
                          struct ptlrpc_request *req)
 {
-        struct ptlrpc_request *oldrep = med->med_outstanding_reply;
+        unsigned long  flags;
+
+        struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
         memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
                sizeof req->rq_ack_locks);
-        oldrep->rq_flags |= PTL_RPC_FL_RESENT;
+        spin_lock_irqsave (&req->rq_lock, flags);
+        oldrep->rq_resent = 1;
         wake_up(&oldrep->rq_wait_for_rep);
+        spin_unlock_irqrestore (&req->rq_lock, flags);
         DEBUG_REQ(D_HA, oldrep, "stole locks from");
         DEBUG_REQ(D_HA, req, "stole locks for");
 }
 
-static void mds_send_reply(struct ptlrpc_request *req, int rc)
-{
-        int i;
-        struct ptlrpc_req_ack_lock *ack_lock;
-        struct l_wait_info lwi;
-        struct mds_export_data *med =
-                (req->rq_export && req->rq_ack_locks[0].mode) ?
-                &req->rq_export->exp_mds_data : NULL;
-
-        if (med) {
-                med->med_outstanding_reply = req;
-                req->rq_flags |= PTL_RPC_FL_WANT_ACK;
-                init_waitqueue_head(&req->rq_wait_for_rep);
-        }
-
-        if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
-                if (rc) {
-                        DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
-                        ptlrpc_error(req->rq_svc, req);
-                } else {
-                        DEBUG_REQ(D_NET, req, "sending reply");
-                        ptlrpc_reply(req->rq_svc, req);
-                }
-        } else {
-                obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
-                DEBUG_REQ(D_ERROR, req, "dropping reply");
-                if (!med && req->rq_repmsg)
-                        OBD_FREE(req->rq_repmsg, req->rq_replen);
-        }
-
-        if (!med) {
-                DEBUG_REQ(D_HA, req, "not waiting for ack");
-                return;
-        }
-
-        lwi = LWI_TIMEOUT(obd_timeout / 2 * HZ, NULL, NULL);
-        rc = l_wait_event(req->rq_wait_for_rep, 
-                          (req->rq_flags & PTL_RPC_FL_WANT_ACK) == 0 ||
-                          (req->rq_flags & PTL_RPC_FL_RESENT),
-                          &lwi);
-
-        if (req->rq_flags & PTL_RPC_FL_RESENT) {
-                /* The client resent this request, so abort the
-                 * waiting-ack portals stuff, and don't decref the
-                 * locks.
-                 */
-                DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
-                ptlrpc_abort(req);
-                return;
-        }
-
-        if (rc == -ETIMEDOUT) {
-                ptlrpc_abort(req);
-                recovd_conn_fail(req->rq_export->exp_connection);
-                DEBUG_REQ(D_HA, req, "cancelling locks for timeout");
-        } else {
-                DEBUG_REQ(D_HA, req, "cancelling locks for ack");
-        }
-        
-        med->med_outstanding_reply = NULL;
-        
-        for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
-                if (!ack_lock->mode)
-                        break;
-                ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
-        }
-}
-
 int mds_handle(struct ptlrpc_request *req)
 {
-        int should_process, rc;
+        int should_process;
+        int rc = 0;
         struct mds_obd *mds = NULL; /* quell gcc overwarning */
         struct obd_device *obd = NULL;
         ENTRY;
 
-        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
-                DEBUG_REQ(D_ERROR, req, "invalid request (%d)", rc);
-                GOTO(out, rc);
-        }
-
         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
 
         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
 
+        /* XXX identical to OST */
         if (req->rq_reqmsg->opc != MDS_CONNECT) {
                 struct mds_export_data *med;
+                int recovering, abort_recovery;
+
                 if (req->rq_export == NULL) {
+                        CERROR("lustre_mds: operation %d on unconnected MDS\n",
+                               req->rq_reqmsg->opc);
                         req->rq_status = -ENOTCONN;
                         GOTO(out, rc = -ENOTCONN);
                 }
@@ -1192,12 +1220,15 @@ int mds_handle(struct ptlrpc_request *req)
                 med = &req->rq_export->exp_mds_data;
                 obd = req->rq_export->exp_obd;
                 mds = &obd->u.mds;
+
+                /* Check for aborted recovery. */
                 spin_lock_bh(&obd->obd_processing_task_lock);
-                if (obd->obd_flags & OBD_ABORT_RECOVERY)
-                        target_abort_recovery(obd);
+                abort_recovery = obd->obd_abort_recovery;
+                recovering = obd->obd_recovering;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-
-                if (obd->obd_flags & OBD_RECOVERING) {
+                if (abort_recovery) {
+                        target_abort_recovery(obd);
+                } else if (recovering) {
                         rc = filter_recovery_request(req, obd, &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
@@ -1224,7 +1255,7 @@ int mds_handle(struct ptlrpc_request *req)
                 /* Make sure that last_rcvd is correct. */
                 if (!rc)
                         mds_fsync_super(mds->mds_sb);
-                req->rq_status = rc;
+                req->rq_status = rc;            /* superfluous? */
                 break;
 
         case MDS_GETSTATUS:
@@ -1253,9 +1284,9 @@ int mds_handle(struct ptlrpc_request *req)
                  * acquiring any new locks in mds_getattr_name, so we don't
                  * want to cancel.
                  */
-                lockh.addr = 0;
+                lockh.cookie = 0;
                 rc = mds_getattr_name(0, req, &lockh);
-                if (rc == 0 && lockh.addr)
+                if (rc == 0 && lockh.cookie)
                         ldlm_lock_decref(&lockh, LCK_PR);
                 break;
         }
@@ -1275,13 +1306,24 @@ int mds_handle(struct ptlrpc_request *req)
                 break;
 
         case MDS_REINT: {
-                int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0);
+                __u32 *opcp = lustre_msg_buf (req->rq_reqmsg, 0, sizeof (*opcp));
+                __u32  opc;
                 int size[2] = {sizeof(struct mds_body), mds->mds_max_mdsize};
                 int bufcount;
 
-                DEBUG_REQ(D_INODE, req, "reint (%s%s)",
-                          reint_names[opc & REINT_OPCODE_MASK],
-                          opc & REINT_REPLAYING ? "|REPLAYING" : "");
+                /* NB only peek inside req now; mds_reint() will swab it */
+                if (opcp == NULL) {
+                        CERROR ("Can't inspect opcode\n");
+                        rc = -EINVAL;
+                        break;
+                }
+                opc = *opcp;
+                if (lustre_msg_swabbed (req->rq_reqmsg))
+                        __swab32s (&opc);
+
+                DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
+                          (opc < sizeof (reint_names) / sizeof (reint_names[0]) ||
+                           reint_names[opc] == NULL) ? reint_names[opc] : "unknown opcode");
 
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
 
@@ -1306,6 +1348,11 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = mds_close(req);
                 break;
 
+        case OBD_PING:
+                DEBUG_REQ(D_INODE, req, "ping");
+                rc = target_handle_ping(req);
+                break;
+
         case LDLM_ENQUEUE:
                 DEBUG_REQ(D_INODE, req, "enqueue");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
@@ -1325,7 +1372,8 @@ int mds_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
                 break;
         default:
-                rc = ptlrpc_error(req->rq_svc, req);
+                req->rq_status = -ENOTSUPP;
+                rc = ptlrpc_error(req);
                 RETURN(rc);
         }
 
@@ -1337,10 +1385,11 @@ int mds_handle(struct ptlrpc_request *req)
                 struct obd_device *obd = list_entry(mds, struct obd_device,
                                                     u.mds);
                 req->rq_repmsg->last_xid =
-                        HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
-                if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
+                        le64_to_cpu (med->med_mcd->mcd_last_xid);
+
+                if (!obd->obd_no_transno) {
                         req->rq_repmsg->last_committed =
-                                HTON__u64(obd->obd_last_committed);
+                                obd->obd_last_committed;
                 } else {
                         DEBUG_REQ(D_IOCTL, req,
                                   "not sending last_committed update");
@@ -1348,12 +1397,12 @@ int mds_handle(struct ptlrpc_request *req)
                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
                        ", xid "LPU64"\n",
                        mds->mds_last_transno, obd->obd_last_committed,
-                       NTOH__u64(req->rq_xid));
+                       req->rq_xid);
         }
  out:
 
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && (obd->obd_flags & OBD_RECOVERING)) {
+                if (obd && obd->obd_recovering) {
                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
                         return target_queue_final_reply(req, rc);
                 }
@@ -1361,7 +1410,7 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = req->rq_status = -ENOTCONN;
         }
 
-        mds_send_reply(req, rc);
+        target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
         return 0;
 }
 
@@ -1414,8 +1463,10 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         struct mds_obd *mds = &obddev->u.mds;
         struct vfsmount *mnt;
         int rc = 0;
+        unsigned long page;
         ENTRY;
 
+
 #ifdef CONFIG_DEV_RDONLY
         dev_clear_rdonly(2);
 #endif
@@ -1426,7 +1477,15 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (IS_ERR(obddev->obd_fsops))
                 RETURN(rc = PTR_ERR(obddev->obd_fsops));
 
-        mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
+       if (!(page = __get_free_page(GFP_KERNEL)))
+               return -ENOMEM;
+
+        memset((void *)page, 0, PAGE_SIZE);
+        sprintf((char *)page, "iopen_nopriv");
+
+        mnt = do_kern_mount(data->ioc_inlbuf2, 0,
+                            data->ioc_inlbuf1, (void *)page);
+        free_page(page);
         if (IS_ERR(mnt)) {
                 rc = PTR_ERR(mnt);
                 CERROR("do_kern_mount failed: rc = %d\n", rc);
@@ -1449,7 +1508,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         obddev->obd_namespace =
                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
         if (obddev->obd_namespace == NULL) {
-                mds_cleanup(obddev);
+                mds_cleanup(obddev, 0, 0);
                 GOTO(err_fs, rc = -ENOMEM);
         }
 
@@ -1461,7 +1520,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         RETURN(0);
 
 err_fs:
-        mds_fs_cleanup(obddev);
+        mds_fs_cleanup(obddev, 0);
 err_put:
         unlock_kernel();
         mntput(mds->mds_vfsmnt);
@@ -1472,7 +1531,7 @@ err_ops:
         return rc;
 }
 
-static int mds_cleanup(struct obd_device *obddev)
+static int mds_cleanup(struct obd_device *obddev, int force, int failover)
 {
         struct super_block *sb;
         struct mds_obd *mds = &obddev->u.mds;
@@ -1483,14 +1542,25 @@ static int mds_cleanup(struct obd_device *obddev)
                 RETURN(0);
 
         mds_update_server_data(mds);
-        mds_fs_cleanup(obddev);
+        mds_fs_cleanup(obddev, failover);
 
         unlock_kernel();
+
+        /* 2 seems normal on mds, (may_umount() also expects 2
+          fwiw), but we only see 1 at this point in obdfilter. */
+        if (atomic_read(&obddev->u.mds.mds_vfsmnt->mnt_count) > 2){
+                CERROR("%s: mount point busy, mnt_count: %d\n",
+                       obddev->obd_name,
+                       atomic_read(&obddev->u.mds.mds_vfsmnt->mnt_count));
+        }
+
         mntput(mds->mds_vfsmnt);
         mds->mds_sb = 0;
 
         ldlm_namespace_free(obddev->obd_namespace);
 
+        if (obddev->obd_recovering)
+                target_cancel_recovery_timer(obddev);
         lock_kernel();
 #ifdef CONFIG_DEV_RDONLY
         dev_clear_rdonly(2);
@@ -1503,18 +1573,32 @@ static int mds_cleanup(struct obd_device *obddev)
 inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
                                         struct lustre_handle *lockh)
 {
-        struct mds_export_data *med = &req->rq_export->exp_mds_data;
-        struct mds_client_data *mcd = med->med_mcd;
-        struct ptlrpc_request *oldrep = med->med_outstanding_reply;
-        struct ldlm_reply *dlm_rep;
+        struct obd_export *exp = req->rq_export;
+        struct obd_device *obd = exp->exp_obd;
+        struct ldlm_request *dlmreq =
+                lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
+        struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+        struct list_head *iter;
+
+        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                return;
+
+        l_lock(&obd->obd_namespace->ns_lock);
+        list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
+                struct ldlm_lock *lock;
+                lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+                if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+                        lockh->cookie = lock->l_handle.h_cookie;
+                        DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+                                  lockh->cookie);
+                        l_unlock(&obd->obd_namespace->ns_lock);
+                        return;
+                }
 
-        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) &&
-            (mcd->mcd_last_xid == req->rq_xid) && (oldrep != NULL)) {
-                DEBUG_REQ(D_HA, req, "restoring lock handle from %p", oldrep);
-                dlm_rep = lustre_msg_buf(oldrep->rq_repmsg, 0);
-                lockh->addr = dlm_rep->lock_handle.addr;
-                lockh->cookie = dlm_rep->lock_handle.cookie;
         }
+        l_unlock(&obd->obd_namespace->ns_lock);
+        DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+                  remote_hdl.cookie);
 }
 
 static int ldlm_intent_policy(struct ldlm_namespace *ns,
@@ -1531,17 +1615,23 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
 
         if (req->rq_reqmsg->bufcount > 1) {
                 /* an intent needs to be considered */
-                struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+                struct ldlm_intent *it;
                 struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
                 struct mds_body *mds_body;
                 struct ldlm_reply *rep;
-                struct lustre_handle lockh;
+                struct lustre_handle lockh = { 0 };
                 struct ldlm_lock *new_lock;
                 int rc, offset = 2, repsize[3] = {sizeof(struct ldlm_reply),
                                                   sizeof(struct mds_body),
                                                   mds->mds_max_mdsize};
 
-                it->opc = NTOH__u64(it->opc);
+                it = lustre_swab_reqbuf (req, 1, sizeof (*it),
+                                         lustre_swab_ldlm_intent);
+                if (it == NULL) {
+                        CERROR ("Intent missing\n");
+                        rc = req->rq_status = -EFAULT;
+                        RETURN (rc);
+                }
 
                 LDLM_DEBUG(lock, "intent policy, opc: %s",
                            ldlm_it2str(it->opc));
@@ -1553,7 +1643,7 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                         RETURN(rc);
                 }
 
-                rep = lustre_msg_buf(req->rq_repmsg, 0);
+                rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
                 rep->lock_policy_res1 = IT_INTENT_EXEC;
 
                 fixup_handle_for_resent_req(req, &lockh);
@@ -1584,7 +1674,7 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                                 rep->lock_policy_res2 = req->rq_status;
                                 RETURN(ELDLM_LOCK_ABORTED);
                         }
-                        mds_body = lustre_msg_buf(req->rq_repmsg, 1);
+                        mds_body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*mds_body));
                         if (!(mds_body->valid & OBD_MD_FLEASIZE)) {
                                 rep->lock_policy_res2 = rc;
                                 RETURN(ELDLM_LOCK_ABORTED);
@@ -1611,17 +1701,37 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                         LBUG();
                 }
 
-                if (flags & LDLM_FL_INTENT_ONLY) {
-                        LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
-                        RETURN(ELDLM_LOCK_ABORTED);
-                }
-
                 /* By this point, whatever function we called above must have
                  * filled in 'lockh' or returned an error.  We want to give the
                  * new lock to the client instead of whatever lock it was about
                  * to get. */
                 new_lock = ldlm_handle2lock(&lockh);
                 LASSERT(new_lock != NULL);
+
+                /* If we've already given this lock to a client once, then we
+                 * should have no readers or writers.  Otherwise, we should
+                 * have one reader _or_ writer ref (which will be zeroed below
+                 * before returning the lock to a client.
+                 */
+                if (new_lock->l_export == req->rq_export)
+                        LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+                else
+                        LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+
+                /* If we're running an intent only, we want to abort the new
+                 * lock, and let the client abort the original lock. */
+                if (flags & LDLM_FL_INTENT_ONLY) {
+                        LDLM_DEBUG(lock, "INTENT_ONLY, aborting locks");
+                        l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+                        if (new_lock->l_readers)
+                                ldlm_lock_decref(&lockh, LCK_PR);
+                        else
+                                ldlm_lock_decref(&lockh, LCK_PW);
+                        l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+                        LDLM_LOCK_PUT(new_lock);
+                        RETURN(ELDLM_LOCK_ABORTED);
+                }
+
                 *lockp = new_lock;
 
                 rep->lock_policy_res2 = req->rq_status;
@@ -1629,14 +1739,13 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                 if (new_lock->l_export == req->rq_export) {
                         /* Already gave this to the client, which means that we
                          * reconstructed a reply. */
-                        LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & 
+                        LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
                                 MSG_RESENT);
                         RETURN(ELDLM_LOCK_REPLACED);
                 }
 
                 /* Fixup the lock to be given to the client */
                 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
-                LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
                 new_lock->l_readers = 0;
                 new_lock->l_writers = 0;
 
@@ -1706,7 +1815,8 @@ static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
-                                           mds_handle, "mds");
+                                           mds_handle, "mds", obddev);
+
         if (!mds->mds_service) {
                 CERROR("failed to start service\n");
                 RETURN(rc = -ENOMEM);
@@ -1726,7 +1836,7 @@ static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
-                                mds_handle, "mds");
+                                mds_handle, "mds_setattr", obddev);
         if (!mds->mds_setattr_service) {
                 CERROR("failed to start getattr service\n");
                 GOTO(err_thread, rc = -ENOMEM);
@@ -1748,7 +1858,7 @@ static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
-                                mds_handle, "mds");
+                                mds_handle, "mds_readpage", obddev);
         if (!mds->mds_readpage_service) {
                 CERROR("failed to start readpage service\n");
                 GOTO(err_thread2, rc = -ENOMEM);
@@ -1781,7 +1891,7 @@ err_thread:
 }
 
 
-static int mdt_cleanup(struct obd_device *obddev)
+static int mdt_cleanup(struct obd_device *obddev, int force, int failover)
 {
         struct mds_obd *mds = &obddev->u.mds;
         ENTRY;
@@ -1803,14 +1913,15 @@ extern int mds_iocontrol(unsigned int cmd, struct lustre_handle *conn,
 
 /* use obd ops to offer management infrastructure */
 static struct obd_ops mds_obd_ops = {
-        o_owner:       THIS_MODULE,
-        o_attach:      mds_attach,
-        o_detach:      mds_detach,
-        o_connect:     mds_connect,
-        o_disconnect:  mds_disconnect,
-        o_setup:       mds_setup,
-        o_cleanup:     mds_cleanup,
-        o_iocontrol:   mds_iocontrol
+        o_owner:          THIS_MODULE,
+        o_attach:         mds_attach,
+        o_detach:         mds_detach,
+        o_connect:        mds_connect,
+        o_disconnect:     mds_disconnect,
+        o_setup:          mds_setup,
+        o_cleanup:        mds_cleanup,
+        o_iocontrol:      mds_iocontrol,
+        o_destroy_export: mds_destroy_export
 };
 
 static struct obd_ops mdt_obd_ops = {
@@ -1825,11 +1936,6 @@ static struct obd_ops mdt_obd_ops = {
 static int __init mds_init(void)
 {
         struct lprocfs_static_vars lvars;
-        mds_file_cache = kmem_cache_create("ll_mds_file_data",
-                                           sizeof(struct mds_file_data),
-                                           0, 0, NULL, NULL);
-        if (mds_file_cache == NULL)
-                return -ENOMEM;
 
         lprocfs_init_multi_vars(0, &lvars);
         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
@@ -1845,8 +1951,6 @@ static void __exit mds_exit(void)
         ldlm_unregister_intent();
         class_unregister_type(LUSTRE_MDS_NAME);
         class_unregister_type(LUSTRE_MDT_NAME);
-        if (kmem_cache_destroy(mds_file_cache))
-                CERROR("couldn't free MDS file cache\n");
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");