Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / mds / handler.c
index 2be4362..58cfa20 100644 (file)
 #include <linux/init.h>
 #include <linux/obd_class.h>
 #include <linux/random.h>
-#include <linux/locks.h>
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/smp_lock.h>
 #include <linux/buffer_head.h>
 #include <linux/workqueue.h>
+#include <linux/mount.h>
+#else 
+#include <linux/locks.h>
 #endif
 #include <linux/obd_lov.h>
 #include <linux/lustre_mds.h>
@@ -50,9 +53,8 @@ kmem_cache_t *mds_file_cache;
 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
                            struct obd_uuid *uuidarray);
 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
-extern void mds_start_transno(struct mds_obd *mds);
-extern int mds_finish_transno(struct mds_obd *mds, void *handle,
-                              struct ptlrpc_request *req, int rc);
+int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
+                       struct ptlrpc_request *req, int rc, int disp);
 static int mds_cleanup(struct obd_device * obddev);
 
 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
@@ -232,7 +234,6 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
         return result;
 }
 
-static void mds_abort_recovery(void *data);
 
 /* Establish a connection to the MDS.
  *
@@ -247,7 +248,6 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         struct obd_export *exp;
         struct mds_export_data *med;
         struct mds_client_data *mcd;
-        struct mds_obd *mds = &obd->u.mds;
         int rc;
         ENTRY;
 
@@ -255,10 +255,10 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                 RETURN(-EINVAL);
 
         /* Check for aborted recovery. */
-        spin_lock_bh(&mds->mds_processing_task_lock);
+        spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_flags & OBD_ABORT_RECOVERY)
-                mds_abort_recovery(mds);
-        spin_unlock_bh(&mds->mds_processing_task_lock);
+                target_abort_recovery(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
 
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
@@ -317,8 +317,10 @@ inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
         mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
         kmem_cache_free(mds_file_cache, mfd);
 
-        if (file->f_dentry->d_parent)
+        if (file->f_dentry->d_parent) {
+                LASSERT(atomic_read(&file->f_dentry->d_parent->d_count));
                 de = dget(file->f_dentry->d_parent);
+        }
         rc = filp_close(file, 0);
         if (de)
                 l_dput(de);
@@ -350,6 +352,13 @@ static int mds_disconnect(struct lustre_handle *conn)
         spin_unlock(&med->med_open_lock);
 
         ldlm_cancel_locks_for_export(export);
+        if (med->med_outstanding_reply) {
+                /* Fake the ack, so the locks get cancelled. */
+                med->med_outstanding_reply->rq_flags &= ~PTL_RPC_FL_WANT_ACK;
+                med->med_outstanding_reply->rq_flags |= PTL_RPC_FL_ERR;
+                wake_up(&med->med_outstanding_reply->rq_wait_for_rep);
+                med->med_outstanding_reply = NULL;
+        }
         mds_client_free(export);
 
         rc = class_disconnect(conn);
@@ -481,14 +490,14 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 struct lustre_handle lockh;
                 int rc;
 
-                LDLM_DEBUG0(lock, "already unused, calling ldlm_cli_cancel");
+                LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
                 ldlm_lock2handle(lock, &lockh);
                 rc = ldlm_cli_cancel(&lockh);
                 if (rc < 0)
                         CERROR("ldlm_cli_cancel: %d\n", rc);
         } else {
-                LDLM_DEBUG0(lock, "Lock still has references, will be "
-                            "cancelled later");
+                LDLM_DEBUG(lock, "Lock still has references, will be "
+                           "cancelled later");
         }
         RETURN(0);
 }
@@ -532,7 +541,7 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
                 rc = 0;
         }
 
-        return rc;
+        RETURN(rc);
 }
 
 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
@@ -623,11 +632,68 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         return(rc);
 }
 
+/* This is more copy-and-paste from getattr_name than I'd like. */
+static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
+                                     struct lustre_handle *client_lockh)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_client_data *mcd = med->med_mcd;
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_obd *mds = mds_req2mds(req);
+        struct dentry *parent, *child;
+        struct mds_body *body;
+        struct inode *dir;
+        struct obd_run_ctxt saved;
+        struct obd_ucred uc;
+        int namelen, rc = 0;
+        char *name;
+
+        req->rq_transno = mcd->mcd_last_transno;
+        req->rq_status = mcd->mcd_last_result;
+
+        if (med->med_outstanding_reply)
+                mds_steal_ack_locks(med, req);
+
+        if (req->rq_status)
+                return;
+
+        body = lustre_msg_buf(req->rq_reqmsg, offset);
+        name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
+        namelen = req->rq_reqmsg->buflens[offset + 1];
+        /* requests were at offset 2, replies go back at 1 */
+        if (offset)
+                offset = 1;
+
+        uc.ouc_fsuid = body->fsuid;
+        uc.ouc_fsgid = body->fsgid;
+        uc.ouc_cap = body->capability;
+        uc.ouc_suppgid1 = body->suppgid;
+        uc.ouc_suppgid2 = -1;
+        push_ctxt(&saved, &mds->mds_ctxt, &uc);
+        parent = mds_fid2dentry(mds, &body->fid1, NULL);
+        LASSERT(!IS_ERR(parent));
+        dir = parent->d_inode;
+        LASSERT(dir);
+        child = lookup_one_len(name, parent, namelen - 1);
+        LASSERT(!IS_ERR(child));
+
+        if (!med->med_outstanding_reply) {
+                /* XXX need to enqueue client lock */
+                LBUG();
+        }
+
+        if (req->rq_repmsg == NULL)
+                mds_getattr_pack_msg(req, child->d_inode, offset);
+        
+        rc = mds_getattr_internal(obd, child, req, body, offset);
+        LASSERT(!rc);
+        l_dput(child);
+        l_dput(parent);
+}
+
 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                             struct lustre_handle *child_lockh)
 {
-        struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
-        int lock_mode;
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
         struct obd_run_ctxt saved;
@@ -637,15 +703,18 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         struct obd_ucred uc;
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct lustre_handle parent_lockh;
-        int namelen, flags = 0, rc = 0;
+        int namelen, flags = 0, rc = 0, cleanup_phase = 0;
         char *name;
         ENTRY;
 
         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
 
+        MDS_CHECK_RESENT(req, 
+                         reconstruct_getattr_name(offset, req, child_lockh));
+
         if (req->rq_reqmsg->bufcount <= offset + 1) {
                 LBUG();
-                GOTO(out_pre_de, rc = -EINVAL);
+                GOTO(cleanup, rc = -EINVAL);
         }
 
         body = lustre_msg_buf(req->rq_reqmsg, offset);
@@ -658,58 +727,75 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         uc.ouc_fsuid = body->fsuid;
         uc.ouc_fsgid = body->fsgid;
         uc.ouc_cap = body->capability;
-        uc.ouc_suppgid = body->suppgid;
+        uc.ouc_suppgid1 = body->suppgid;
+        uc.ouc_suppgid2 = -1;
         push_ctxt(&saved, &mds->mds_ctxt, &uc);
         /* Step 1: Lookup/lock parent */
         de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
                                    &parent_lockh);
         if (IS_ERR(de))
-                GOTO(out_pre_de, rc = PTR_ERR(de));
+                GOTO(cleanup, rc = PTR_ERR(de));
         dir = de->d_inode;
         LASSERT(dir);
 
+        cleanup_phase = 1; /* parent dentry and lock */
+
         CDEBUG(D_INODE, "parent ino %lu, name %*s\n", dir->i_ino,namelen,name);
 
         /* Step 2: Lookup child */
         dchild = lookup_one_len(name, de, namelen - 1);
         if (IS_ERR(dchild)) {
                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
-                GOTO(out_step_1, rc = PTR_ERR(dchild));
-        } else if (dchild->d_inode == NULL) {
-                GOTO(out_step_2, rc = -ENOENT);
+                GOTO(cleanup, rc = PTR_ERR(dchild));
+        }
+
+        cleanup_phase = 2; /* child dentry */
+
+        if (dchild->d_inode == NULL) {
+                GOTO(cleanup, rc = -ENOENT);
         }
 
         /* Step 3: Lock child */
-        if (it->opc == IT_SETATTR)
-                lock_mode = LCK_PW;
-        else
-                lock_mode = LCK_PR;
         child_res_id.name[0] = dchild->d_inode->i_ino;
         child_res_id.name[1] = dchild->d_inode->i_generation;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                              child_res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+                              child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
                               &flags, ldlm_completion_ast, mds_blocking_ast,
                               NULL, NULL, child_lockh);
         if (rc != ELDLM_OK) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
-                GOTO(out_step_2, rc = -EIO);
+                GOTO(cleanup, rc = -EIO);
         }
 
+        cleanup_phase = 3; /* child lock */
+
         if (req->rq_repmsg == NULL)
                 mds_getattr_pack_msg(req, dchild->d_inode, offset);
 
         rc = mds_getattr_internal(obd, dchild, req, body, offset);
-        if (rc)
-                GOTO(out_step_3, rc);
-        GOTO(out_step_2, rc); /* returns the lock to the client */
- out_step_3:
-        ldlm_lock_decref(child_lockh, LCK_PR);
- out_step_2:
-        l_dput(dchild);
- out_step_1:
-        ldlm_lock_decref(&parent_lockh, LCK_PR);
-        l_dput(de);
- out_pre_de:
+        GOTO(cleanup, rc); /* returns the lock to the client */
+        
+ cleanup:
+        rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
+                                req, rc, 0);
+        switch (cleanup_phase) {
+        case 3:
+                if (rc)
+                        ldlm_lock_decref(child_lockh, LCK_PR);
+        case 2:
+                l_dput(dchild);
+
+        case 1:
+                if (rc) {
+                        ldlm_lock_decref(&parent_lockh, LCK_PR);
+                } else {
+                        memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
+                               sizeof(parent_lockh));
+                        req->rq_ack_locks[0].mode = LCK_PR;
+                }
+                l_dput(de);
+        default: ;
+        }
         req->rq_status = rc;
         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
         return rc;
@@ -822,20 +908,14 @@ static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
         uc.ouc_fsgid = body->fsgid;
         uc.ouc_cap = body->capability;
         push_ctxt(&saved, &mds->mds_ctxt, &uc);
-        mds_start_transno(mds);
         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
         if (IS_ERR(handle)) {
                 rc = PTR_ERR(handle);
-                mds_finish_transno(mds, handle, req, rc);
                 GOTO(out_ea, rc);
         }
 
         rc = fsfilt_set_md(obd, inode,handle,lmm,lmm_size);
-        rc = mds_finish_transno(mds, handle, req, rc);
-
-        rc2 = fsfilt_commit(obd, inode, handle);
-        if (rc2 && !rc)
-                rc = rc2;
+        rc = mds_finish_transno(mds, inode, handle, req, rc, 0);
 out_ea:
         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
 
@@ -844,6 +924,20 @@ out_ea:
 
 #endif
 
+static void reconstruct_close(struct ptlrpc_request *req)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_client_data *mcd = med->med_mcd;
+
+        req->rq_transno = mcd->mcd_last_transno;
+        req->rq_status = mcd->mcd_last_result;
+
+        /* XXX When open-unlink is working, we'll need to steal ack locks as
+         * XXX well, and make sure that we do the right unlinking after we
+         * XXX get the ack back.
+         */
+}
+
 static int mds_close(struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
@@ -852,6 +946,8 @@ static int mds_close(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
+        MDS_CHECK_RESENT(req, reconstruct_close(req));
+
         body = lustre_msg_buf(req->rq_reqmsg, 0);
 
         mfd = mds_handle2mfd(&body->handle);
@@ -956,199 +1052,8 @@ int mds_reint(struct ptlrpc_request *req, int offset,
         return rc;
 }
 
-/* forward declaration */
-int mds_handle(struct ptlrpc_request *req);
-
-static void abort_delayed_replies(struct mds_obd *mds)
-{
-        struct ptlrpc_request *req;
-        struct list_head *tmp, *n;
-        list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                DEBUG_REQ(D_ERROR, req, "aborted:");
-                req->rq_status = -ENOTCONN;
-                req->rq_type = PTL_RPC_MSG_ERR;
-                ptlrpc_reply(req->rq_svc, req);
-                list_del(&req->rq_list);
-                OBD_FREE(req, sizeof *req);
-        }
-}
-
-static void mds_abort_recovery(void *data)
-{
-        struct mds_obd *mds = data;
-        struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
-        CERROR("disconnecting clients and aborting recovery\n");
-        mds->mds_recoverable_clients = 0;
-        obd->obd_flags &= ~(OBD_RECOVERING | OBD_ABORT_RECOVERY);
-        abort_delayed_replies(mds);
-        spin_unlock_bh(&mds->mds_processing_task_lock);
-        class_disconnect_all(obd);
-        spin_lock_bh(&mds->mds_processing_task_lock);
-}
-
-static void mds_recovery_expired(unsigned long castmeharder)
-{
-        struct mds_obd *mds = (struct mds_obd *)castmeharder;
-        struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
-        CERROR("recovery timed out, aborting\n");
-        spin_lock_bh(&mds->mds_processing_task_lock);
-        obd->obd_flags |= OBD_ABORT_RECOVERY;
-        wake_up(&mds->mds_next_transno_waitq);
-        spin_unlock_bh(&mds->mds_processing_task_lock);
-}
-
-static void reset_recovery_timer(struct mds_obd *mds)
-{
-        CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
-               MDS_RECOVERY_TIMEOUT / HZ);
-        mod_timer(&mds->mds_recovery_timer, jiffies + MDS_RECOVERY_TIMEOUT);
-}
-
-static void start_recovery_timer(struct mds_obd *mds)
-{
-        mds->mds_recovery_timer.function = mds_recovery_expired;
-        mds->mds_recovery_timer.data = (unsigned long)mds;
-        init_timer(&mds->mds_recovery_timer);
-        reset_recovery_timer(mds);
-}
-
-static void cancel_recovery_timer(struct mds_obd *mds)
-{
-        del_timer(&mds->mds_recovery_timer);
-}
-
-static int check_for_next_transno(struct mds_obd *mds)
-{
-        struct ptlrpc_request *req;
-        struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
-        req = list_entry(mds->mds_recovery_queue.next,
-                         struct ptlrpc_request, rq_list);
-        LASSERT(req->rq_reqmsg->transno >= mds->mds_next_recovery_transno);
-
-        return req->rq_reqmsg->transno == mds->mds_next_recovery_transno ||
-                (obd->obd_flags & OBD_RECOVERING) == 0;
-}
-
-static void process_recovery_queue(struct mds_obd *mds)
-{
-        struct ptlrpc_request *req;
-        struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
-        int aborted = 0;
-        ENTRY;
-
-        for (;;) {
-                spin_lock_bh(&mds->mds_processing_task_lock);
-                LASSERT(mds->mds_processing_task == current->pid);
-                req = list_entry(mds->mds_recovery_queue.next,
-                                 struct ptlrpc_request, rq_list);
-
-                if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
-                        spin_unlock_bh(&mds->mds_processing_task_lock);
-                        CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
-                               LPD64")\n",
-                               mds->mds_next_recovery_transno,
-                               req->rq_reqmsg->transno);
-                        wait_event(mds->mds_next_transno_waitq,
-                                   check_for_next_transno(mds));
-                        spin_lock_bh(&mds->mds_processing_task_lock);
-                        if (obd->obd_flags & OBD_ABORT_RECOVERY) {
-                                mds_abort_recovery(mds);
-                                aborted = 1;
-                        }
-                        spin_unlock_bh(&mds->mds_processing_task_lock);
-                        if (aborted)
-                                return;
-                        continue;
-                }
-                list_del_init(&req->rq_list);
-                spin_unlock_bh(&mds->mds_processing_task_lock);
-
-                DEBUG_REQ(D_ERROR, req, "processing: ");
-                (void)mds_handle(req);
-                reset_recovery_timer(mds);
-                mds_fsync_super(mds->mds_sb);
-                OBD_FREE(req, sizeof *req);
-                spin_lock_bh(&mds->mds_processing_task_lock);
-                mds->mds_next_recovery_transno++;
-                if (list_empty(&mds->mds_recovery_queue)) {
-                        mds->mds_processing_task = 0;
-                        spin_unlock_bh(&mds->mds_processing_task_lock);
-                        break;
-                }
-                spin_unlock_bh(&mds->mds_processing_task_lock);
-        }
-        EXIT;
-}
-
-static int queue_recovery_request(struct ptlrpc_request *req,
-                                  struct mds_obd *mds)
-{
-        struct list_head *tmp;
-        int inserted = 0;
-        __u64 transno = req->rq_reqmsg->transno;
-        struct ptlrpc_request *saved_req;
-
-        if (!transno) {
-                INIT_LIST_HEAD(&req->rq_list);
-                DEBUG_REQ(D_HA, req, "not queueing");
-                return 1;
-        }
-
-        spin_lock_bh(&mds->mds_processing_task_lock);
-
-        if (mds->mds_processing_task == current->pid) {
-                /* Processing the queue right now, don't re-add. */
-                LASSERT(list_empty(&req->rq_list));
-                spin_unlock_bh(&mds->mds_processing_task_lock);
-                return 1;
-        }
-
-        OBD_ALLOC(saved_req, sizeof *saved_req);
-        if (!saved_req)
-                LBUG();
-        memcpy(saved_req, req, sizeof *req);
-        req = saved_req;
-        INIT_LIST_HEAD(&req->rq_list);
-
-        /* XXX O(n^2) */
-        list_for_each(tmp, &mds->mds_recovery_queue) {
-                struct ptlrpc_request *reqiter =
-                        list_entry(tmp, struct ptlrpc_request, rq_list);
-
-                if (reqiter->rq_reqmsg->transno > transno) {
-                        list_add_tail(&req->rq_list, &reqiter->rq_list);
-                        inserted = 1;
-                        break;
-                }
-        }
-
-        if (!inserted) {
-                list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
-        }
-
-        if (mds->mds_processing_task != 0) {
-                /* Someone else is processing this queue, we'll leave it to
-                 * them.
-                 */
-                if (transno == mds->mds_next_recovery_transno)
-                        wake_up(&mds->mds_next_transno_waitq);
-                spin_unlock_bh(&mds->mds_processing_task_lock);
-                return 0;
-        }
-
-        /* Nobody is processing, and we know there's (at least) one to process
-         * now, so we'll do the honours.
-         */
-        mds->mds_processing_task = current->pid;
-        spin_unlock_bh(&mds->mds_processing_task_lock);
-
-        process_recovery_queue(mds);
-        return 0;
-}
-
 static int filter_recovery_request(struct ptlrpc_request *req,
-                                   struct mds_obd *mds, int *process)
+                                   struct obd_device *obd, int *process)
 {
         switch (req->rq_reqmsg->opc) {
         case MDS_CONNECT: /* This will never get here, but for completeness. */
@@ -1160,7 +1065,7 @@ static int filter_recovery_request(struct ptlrpc_request *req,
         case MDS_GETSTATUS: /* used in unmounting */
         case MDS_REINT:
         case LDLM_ENQUEUE:
-                *process = queue_recovery_request(req, mds);
+                *process = target_queue_recovery_request(req, obd);
                 RETURN(0);
 
         default:
@@ -1171,48 +1076,6 @@ static int filter_recovery_request(struct ptlrpc_request *req,
         }
 }
 
-static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
-{
-        struct mds_obd *mds = mds_req2mds(req);
-        struct obd_device *mds_obd = list_entry(mds, struct obd_device, u.mds);
-        struct ptlrpc_request *saved_req;
-
-        spin_lock_bh(&mds->mds_processing_task_lock);
-        if (rc) {
-                /* Just like ptlrpc_error, but without the sending. */
-                lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
-                                &req->rq_repmsg);
-                req->rq_type = PTL_RPC_MSG_ERR;
-        }
-
-        LASSERT(list_empty(&req->rq_list));
-        OBD_ALLOC(saved_req, sizeof *saved_req);
-        memcpy(saved_req, req, sizeof *saved_req);
-        req = saved_req;
-        list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
-        if (--mds->mds_recoverable_clients == 0) {
-                struct list_head *tmp, *n;
-                ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
-                CDEBUG(D_ERROR,
-                       "all clients recovered, sending delayed replies\n");
-                mds_obd->obd_flags &= ~OBD_RECOVERING;
-                list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
-                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                        DEBUG_REQ(D_ERROR, req, "delayed:");
-                        ptlrpc_reply(req->rq_svc, req);
-                        list_del(&req->rq_list);
-                        OBD_FREE(req, sizeof *req);
-                }
-                cancel_recovery_timer(mds);
-        } else {
-                CERROR("%d recoverable clients remain\n",
-                       mds->mds_recoverable_clients);
-        }
-
-        spin_unlock_bh(&mds->mds_processing_task_lock);
-        return 1;
-}
-
 static char *reint_names[] = {
         [REINT_SETATTR] "setattr",
         [REINT_CREATE]  "create",
@@ -1222,11 +1085,91 @@ static char *reint_names[] = {
         [REINT_OPEN]    "open",
 };
 
+void mds_steal_ack_locks(struct mds_export_data *med,
+                         struct ptlrpc_request *req)
+{
+        struct ptlrpc_request *oldrep = med->med_outstanding_reply;
+        memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
+               sizeof req->rq_ack_locks);
+        oldrep->rq_flags |= PTL_RPC_FL_RESENT;
+        wake_up(&oldrep->rq_wait_for_rep);
+        DEBUG_REQ(D_HA, oldrep, "stole locks from");
+        DEBUG_REQ(D_HA, req, "stole locks for");
+}
+
+static void mds_send_reply(struct ptlrpc_request *req, int rc)
+{
+        int i;
+        struct ptlrpc_req_ack_lock *ack_lock;
+        struct l_wait_info lwi;
+        struct mds_export_data *med =
+                (req->rq_export && req->rq_ack_locks[0].mode) ?
+                &req->rq_export->exp_mds_data : NULL;
+
+        if (med) {
+                med->med_outstanding_reply = req;
+                req->rq_flags |= PTL_RPC_FL_WANT_ACK;
+                init_waitqueue_head(&req->rq_wait_for_rep);
+        }
+
+        if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
+                if (rc) {
+                        DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
+                        ptlrpc_error(req->rq_svc, req);
+                } else {
+                        DEBUG_REQ(D_NET, req, "sending reply");
+                        ptlrpc_reply(req->rq_svc, req);
+                }
+        } else {
+                obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+                DEBUG_REQ(D_ERROR, req, "dropping reply");
+                if (!med && req->rq_repmsg)
+                        OBD_FREE(req->rq_repmsg, req->rq_replen);
+        }
+
+        if (!med) {
+                DEBUG_REQ(D_HA, req, "not waiting for ack");
+                return;
+        }
+
+        lwi = LWI_TIMEOUT(obd_timeout / 2 * HZ, NULL, NULL);
+        rc = l_wait_event(req->rq_wait_for_rep, 
+                          (req->rq_flags & PTL_RPC_FL_WANT_ACK) == 0 ||
+                          (req->rq_flags & PTL_RPC_FL_RESENT),
+                          &lwi);
+
+        if (req->rq_flags & PTL_RPC_FL_RESENT) {
+                /* The client resent this request, so abort the
+                 * waiting-ack portals stuff, and don't decref the
+                 * locks.
+                 */
+                DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
+                ptlrpc_abort(req);
+                return;
+        }
+
+        if (rc == -ETIMEDOUT) {
+                ptlrpc_abort(req);
+                recovd_conn_fail(req->rq_export->exp_connection);
+                DEBUG_REQ(D_HA, req, "cancelling locks for timeout");
+        } else {
+                DEBUG_REQ(D_HA, req, "cancelling locks for ack");
+        }
+        
+        med->med_outstanding_reply = NULL;
+        
+        for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
+                if (!ack_lock->mode)
+                        break;
+                ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
+        }
+}
+
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process, rc;
         struct mds_obd *mds = NULL; /* quell gcc overwarning */
-        struct obd_device *mds_obd = NULL;
+        struct obd_device *obd = NULL;
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
@@ -1247,37 +1190,25 @@ int mds_handle(struct ptlrpc_request *req)
                 }
 
                 med = &req->rq_export->exp_mds_data;
-                mds_obd = req->rq_export->exp_obd;
-                mds = &mds_obd->u.mds;
-                spin_lock_bh(&mds->mds_processing_task_lock);
-                if (mds_obd->obd_flags & OBD_ABORT_RECOVERY)
-                        mds_abort_recovery(mds);
-                spin_unlock_bh(&mds->mds_processing_task_lock);
-
-                if (mds_obd->obd_flags & OBD_RECOVERING) {
-                        rc = filter_recovery_request(req, mds, &should_process);
+                obd = req->rq_export->exp_obd;
+                mds = &obd->u.mds;
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                if (obd->obd_flags & OBD_ABORT_RECOVERY)
+                        target_abort_recovery(obd);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+
+                if (obd->obd_flags & OBD_RECOVERING) {
+                        rc = filter_recovery_request(req, obd, &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
-                } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                        if (req->rq_xid == med->med_last_xid) {
-                                DEBUG_REQ(D_HA, req, "resending reply");
-                                OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
-                                req->rq_replen = med->med_last_replen;
-                                memcpy(req->rq_repmsg, med->med_last_reply,
-                                       req->rq_replen);
-                                ptlrpc_reply(req->rq_svc, req);
-                                return 0;
-                        }
-                        DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
                 }
-
         }
 
         switch (req->rq_reqmsg->opc) {
         case MDS_CONNECT:
                 DEBUG_REQ(D_INODE, req, "connect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
-                rc = target_handle_connect(req);
+                rc = target_handle_connect(req, mds_handle);
                 /* Make sure that last_rcvd is correct. */
                 if (!rc) {
                         /* Now that we have an export, set mds. */
@@ -1317,8 +1248,14 @@ int mds_handle(struct ptlrpc_request *req)
                 struct lustre_handle lockh;
                 DEBUG_REQ(D_INODE, req, "getattr_name");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+
+                /* If this request gets a reconstructed reply, we won't be
+                 * acquiring any new locks in mds_getattr_name, so we don't
+                 * want to cancel.
+                 */
+                lockh.addr = 0;
                 rc = mds_getattr_name(0, req, &lockh);
-                if (rc == 0)
+                if (rc == 0 && lockh.addr)
                         ldlm_lock_decref(&lockh, LCK_PR);
                 break;
         }
@@ -1329,7 +1266,7 @@ int mds_handle(struct ptlrpc_request *req)
                 break;
 
         case MDS_READPAGE:
-                DEBUG_REQ(D_INODE, req, "readpage\n");
+                DEBUG_REQ(D_INODE, req, "readpage");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
                 rc = mds_readpage(req);
 
@@ -1408,49 +1345,23 @@ int mds_handle(struct ptlrpc_request *req)
                         DEBUG_REQ(D_IOCTL, req,
                                   "not sending last_committed update");
                 }
-                CDEBUG(D_INFO, "last_transno %Lu, last_committed %Lu, xid %d\n",
-                       (unsigned long long)mds->mds_last_rcvd,
-                       (unsigned long long)obd->obd_last_committed,
-                       cpu_to_le32(req->rq_xid));
+                CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
+                       ", xid "LPU64"\n",
+                       mds->mds_last_transno, obd->obd_last_committed,
+                       NTOH__u64(req->rq_xid));
         }
  out:
 
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (mds_obd && (mds_obd->obd_flags & OBD_RECOVERING)) {
+                if (obd && (obd->obd_flags & OBD_RECOVERING)) {
                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        return mds_queue_final_reply(req, rc);
+                        return target_queue_final_reply(req, rc);
                 }
                 /* Lost a race with recovery; let the error path DTRT. */
                 rc = req->rq_status = -ENOTCONN;
         }
 
-        if (req->rq_export && mds_obd &&
-            (mds_obd->obd_flags & OBD_RECOVERING) == 0) {
-                struct mds_export_data *med = &req->rq_export->exp_mds_data;
-                if (med->med_last_reply)
-                        OBD_FREE(med->med_last_reply, med->med_last_replen);
-                OBD_ALLOC(med->med_last_reply, req->rq_replen);
-                med->med_last_replen = req->rq_replen;
-                med->med_last_xid = req->rq_xid;
-                memcpy(med->med_last_reply, req->rq_repmsg, req->rq_replen);
-                /* XXX serialize */
-        }
-
-        if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
-                if (rc) {
-                        DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
-                        ptlrpc_error(req->rq_svc, req);
-                } else {
-                        DEBUG_REQ(D_NET, req, "sending reply");
-                        ptlrpc_reply(req->rq_svc, req);
-                }
-        } else {
-                obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
-                DEBUG_REQ(D_ERROR, req, "dropping reply");
-                if (req->rq_repmsg)
-                        OBD_FREE(req->rq_repmsg, req->rq_replen);
-        }
-
+        mds_send_reply(req, rc);
         return 0;
 }
 
@@ -1459,7 +1370,7 @@ int mds_handle(struct ptlrpc_request *req)
  * then the server last_rcvd value may be less than that of the clients.
  * This will alert us that we may need to do client recovery.
  *
- * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
+ * Also assumes for mds_last_transno that we are not modifying it (no locking).
  */
 int mds_update_server_data(struct mds_obd *mds)
 {
@@ -1470,12 +1381,12 @@ int mds_update_server_data(struct mds_obd *mds)
         int rc;
 
         push_ctxt(&saved, &mds->mds_ctxt, NULL);
-        msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
+        msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
 
-        CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
+        CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_transno is %Lu\n",
                (unsigned long long)mds->mds_mount_count,
-               (unsigned long long)mds->mds_last_rcvd);
+               (unsigned long long)mds->mds_last_transno);
         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
         if (rc != sizeof(*msd)) {
                 CERROR("error writing MDS server data: rc = %d\n", rc);
@@ -1486,7 +1397,7 @@ int mds_update_server_data(struct mds_obd *mds)
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
 #else
-        rc = file_fsync(filp,  filp->f_dentry, 1);
+        rc = file_fsync(filp, filp->f_dentry, 1);
 #endif
         if (rc)
                 CERROR("error flushing MDS server data: rc = %d\n", rc);
@@ -1527,7 +1438,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (!mds->mds_sb)
                 GOTO(err_put, rc = -ENODEV);
 
-        init_MUTEX(&mds->mds_transno_sem);
+        spin_lock_init(&mds->mds_transno_lock);
         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
         rc = mds_fs_setup(obddev, mnt);
         if (rc) {
@@ -1535,9 +1446,6 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_put, rc);
         }
 
-        if (obddev->obd_flags & OBD_RECOVERING)
-                start_recovery_timer(mds);
-
         obddev->obd_namespace =
                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
         if (obddev->obd_namespace == NULL) {
@@ -1548,12 +1456,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mds_ldlm_client", &obddev->obd_ldlm_client);
 
-        spin_lock_init(&mds->mds_processing_task_lock);
-        mds->mds_processing_task = 0;
         mds->mds_has_lov_desc = 0;
-        INIT_LIST_HEAD(&mds->mds_recovery_queue);
-        INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
-        init_waitqueue_head(&mds->mds_next_transno_waitq);
 
         RETURN(0);
 
@@ -1566,7 +1469,7 @@ err_put:
         lock_kernel();
 err_ops:
         fsfilt_put_ops(obddev->obd_fsops);
-        RETURN(rc);
+        return rc;
 }
 
 static int mds_cleanup(struct obd_device *obddev)
@@ -1597,6 +1500,23 @@ static int mds_cleanup(struct obd_device *obddev)
         RETURN(0);
 }
 
+inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
+                                        struct lustre_handle *lockh)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_client_data *mcd = med->med_mcd;
+        struct ptlrpc_request *oldrep = med->med_outstanding_reply;
+        struct ldlm_reply *dlm_rep;
+
+        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) &&
+            (mcd->mcd_last_xid == req->rq_xid) && (oldrep != NULL)) {
+                DEBUG_REQ(D_HA, req, "restoring lock handle from %p", oldrep);
+                dlm_rep = lustre_msg_buf(oldrep->rq_repmsg, 0);
+                lockh->addr = dlm_rep->lock_handle.addr;
+                lockh->cookie = dlm_rep->lock_handle.cookie;
+        }
+}
+
 static int ldlm_intent_policy(struct ldlm_namespace *ns,
                               struct ldlm_lock **lockp, void *req_cookie,
                               ldlm_mode_t mode, int flags, void *data)
@@ -1636,6 +1556,8 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                 rep = lustre_msg_buf(req->rq_repmsg, 0);
                 rep->lock_policy_res1 = IT_INTENT_EXEC;
 
+                fixup_handle_for_resent_req(req, &lockh);
+
                 /* execute policy */
                 switch ((long)it->opc) {
                 case IT_OPEN:
@@ -1671,7 +1593,6 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                 case IT_GETATTR:
                 case IT_LOOKUP:
                 case IT_READDIR:
-                case IT_SETATTR:
                         rc = mds_getattr_name(offset, req, &lockh);
                         /* FIXME: we need to sit down and decide on who should
                          * set req->rq_status, who should return negative and
@@ -1691,7 +1612,7 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                 }
 
                 if (flags & LDLM_FL_INTENT_ONLY) {
-                        LDLM_DEBUG0(lock, "INTENT_ONLY, aborting lock");
+                        LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
                         RETURN(ELDLM_LOCK_ABORTED);
                 }
 
@@ -1701,9 +1622,18 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                  * to get. */
                 new_lock = ldlm_handle2lock(&lockh);
                 LASSERT(new_lock != NULL);
-                mds_body = lustre_msg_buf(req->rq_repmsg, 1);
                 *lockp = new_lock;
 
+                rep->lock_policy_res2 = req->rq_status;
+
+                if (new_lock->l_export == req->rq_export) {
+                        /* Already gave this to the client, which means that we
+                         * reconstructed a reply. */
+                        LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & 
+                                MSG_RESENT);
+                        RETURN(ELDLM_LOCK_REPLACED);
+                }
+
                 /* Fixup the lock to be given to the client */
                 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
@@ -1728,8 +1658,6 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                 LDLM_LOCK_PUT(new_lock);
                 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
 
-                rep->lock_policy_res2 = req->rq_status;
-
                 RETURN(ELDLM_LOCK_REPLACED);
         } else {
                 int size = sizeof(struct ldlm_reply);
@@ -1772,14 +1700,13 @@ int mdt_detach(struct obd_device *dev)
 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct mds_obd *mds = &obddev->u.mds;
-        struct obd_uuid uuid = { "self" };
         int i, rc = 0;
         ENTRY;
 
         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
-                                           &uuid, mds_handle, "mds");
+                                           mds_handle, "mds");
         if (!mds->mds_service) {
                 CERROR("failed to start service\n");
                 RETURN(rc = -ENOMEM);
@@ -1795,12 +1722,12 @@ static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
                 }
         }
 
-        mds->mds_getattr_service =
+        mds->mds_setattr_service =
                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
-                                MDS_GETATTR_PORTAL, MDC_REPLY_PORTAL,
-                                &uuid, mds_handle, "mds");
-        if (!mds->mds_getattr_service) {
+                                MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
+                                mds_handle, "mds");
+        if (!mds->mds_setattr_service) {
                 CERROR("failed to start getattr service\n");
                 GOTO(err_thread, rc = -ENOMEM);
         }
@@ -1808,20 +1735,45 @@ static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
         for (i = 0; i < MDT_NUM_THREADS; i++) {
                 char name[32];
                 sprintf(name, "ll_mdt_attr_%02d", i);
-                rc = ptlrpc_start_thread(obddev, mds->mds_getattr_service,
+                rc = ptlrpc_start_thread(obddev, mds->mds_setattr_service,
                                          name);
                 if (rc) {
-                        CERROR("cannot start MDT getattr thread #%d: rc %d\n",
+                        CERROR("cannot start MDT setattr thread #%d: rc %d\n",
                                i, rc);
                         GOTO(err_thread2, rc);
                 }
         }
 
+        mds->mds_readpage_service =
+                ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
+                                MDS_BUFSIZE, MDS_MAXREQSIZE,
+                                MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
+                                mds_handle, "mds");
+        if (!mds->mds_readpage_service) {
+                CERROR("failed to start readpage service\n");
+                GOTO(err_thread2, rc = -ENOMEM);
+        }
+
+        for (i = 0; i < MDT_NUM_THREADS; i++) {
+                char name[32];
+                sprintf(name, "ll_mdt_rdpg_%02d", i);
+                rc = ptlrpc_start_thread(obddev, mds->mds_readpage_service,
+                                         name);
+                if (rc) {
+                        CERROR("cannot start MDT readpage thread #%d: rc %d\n",
+                               i, rc);
+                        GOTO(err_thread3, rc);
+                }
+        }
+
         RETURN(0);
 
+err_thread3:
+        ptlrpc_stop_all_threads(mds->mds_readpage_service);
+        ptlrpc_unregister_service(mds->mds_readpage_service);
 err_thread2:
-        ptlrpc_stop_all_threads(mds->mds_getattr_service);
-        ptlrpc_unregister_service(mds->mds_getattr_service);
+        ptlrpc_stop_all_threads(mds->mds_setattr_service);
+        ptlrpc_unregister_service(mds->mds_setattr_service);
 err_thread:
         ptlrpc_stop_all_threads(mds->mds_service);
         ptlrpc_unregister_service(mds->mds_service);
@@ -1834,8 +1786,11 @@ static int mdt_cleanup(struct obd_device *obddev)
         struct mds_obd *mds = &obddev->u.mds;
         ENTRY;
 
-        ptlrpc_stop_all_threads(mds->mds_getattr_service);
-        ptlrpc_unregister_service(mds->mds_getattr_service);
+        ptlrpc_stop_all_threads(mds->mds_readpage_service);
+        ptlrpc_unregister_service(mds->mds_readpage_service);
+
+        ptlrpc_stop_all_threads(mds->mds_setattr_service);
+        ptlrpc_unregister_service(mds->mds_setattr_service);
 
         ptlrpc_stop_all_threads(mds->mds_service);
         ptlrpc_unregister_service(mds->mds_service);