Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 998c462..94a068d 100644 (file)
@@ -33,6 +33,8 @@
 #include <linux/lustre_ha.h>
 #include <linux/lustre_import.h>
 
+#include "ptlrpc_internal.h"
+
 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
                         struct ptlrpc_client *cl)
 {
@@ -70,7 +72,8 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
         return c;
 }
 
-void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,struct obd_uuid *uuid)
+void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
+                                 struct obd_uuid *uuid)
 {
         struct ptlrpc_peer peer;
         int err;
@@ -85,69 +88,123 @@ void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,struct obd_uuid
         return;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
+static inline struct ptlrpc_bulk_desc *new_bulk(void)
 {
         struct ptlrpc_bulk_desc *desc;
 
         OBD_ALLOC(desc, sizeof(*desc));
-        if (desc != NULL) {
-                desc->bd_connection = ptlrpc_connection_addref(conn);
-                atomic_set(&desc->bd_refcount, 1);
-                init_waitqueue_head(&desc->bd_waitq);
-                INIT_LIST_HEAD(&desc->bd_page_list);
-                INIT_LIST_HEAD(&desc->bd_set_chain);
-                ptl_set_inv_handle(&desc->bd_md_h);
-                ptl_set_inv_handle(&desc->bd_me_h);
-        }
+        if (!desc)
+                return NULL;
+
+        spin_lock_init (&desc->bd_lock);
+        init_waitqueue_head(&desc->bd_waitq);
+        INIT_LIST_HEAD(&desc->bd_page_list);
+        desc->bd_md_h = PTL_HANDLE_NONE;
+        desc->bd_me_h = PTL_HANDLE_NONE;
 
         return desc;
 }
 
-int ptlrpc_bulk_error(struct ptlrpc_bulk_desc *desc)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
+                                               int type, int portal)
 {
-        int rc = 0;
-        if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
-                rc = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
-                      -ETIMEDOUT);
-        }
-        return rc;
+        struct obd_import       *imp = req->rq_import;
+        unsigned long            flags;
+        struct ptlrpc_bulk_desc *desc;
+
+        LASSERT (type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
+        
+        desc = new_bulk();
+        if (desc == NULL)
+                RETURN(NULL);
+        
+        /* Is this sampled at the right place?  Do we want to get the import
+         * generation just before we send?  Should it match the generation of
+         * the request? */
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        desc->bd_import_generation = imp->imp_generation;
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+        desc->bd_import = class_import_get(imp);
+        desc->bd_req = req;
+        desc->bd_type = type;
+        desc->bd_portal = portal;
+
+        /* This makes req own desc, and free it when she frees herself */
+        req->rq_bulk = desc;
+
+        return desc;
 }
 
-struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
+                                               int type, int portal)
+{
+        struct obd_export       *exp = req->rq_export;
+        struct ptlrpc_bulk_desc *desc;
+
+        LASSERT (type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
+        
+        desc = new_bulk();
+        if (desc == NULL)
+                RETURN(NULL);
+
+        desc->bd_export = class_export_get(exp);
+        desc->bd_req = req;
+        desc->bd_type = type;
+        desc->bd_portal = portal;
+
+        /* NB we don't assign rq_bulk here; server-side requests are
+         * re-used, and the handler frees the bulk desc explicitly. */
+
+        return desc;
+}
+
+int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+                          struct page *page, int pageoffset, int len)
 {
         struct ptlrpc_bulk_page *bulk;
 
         OBD_ALLOC(bulk, sizeof(*bulk));
-        if (bulk != NULL) {
-                bulk->bp_desc = desc;
-                list_add_tail(&bulk->bp_link, &desc->bd_page_list);
-                desc->bd_page_count++;
-        }
-        return bulk;
+        if (bulk == NULL)
+                return (-ENOMEM);
+
+        LASSERT (page != NULL);
+        LASSERT (pageoffset >= 0);
+        LASSERT (len > 0);
+        LASSERT (pageoffset + len <= PAGE_SIZE);
+
+        bulk->bp_page = page;
+        bulk->bp_pageoffset = pageoffset;
+        bulk->bp_buflen = len;
+
+        bulk->bp_desc = desc;
+        list_add_tail(&bulk->bp_link, &desc->bd_page_list);
+        desc->bd_page_count++;
+        return 0;
 }
 
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 {
         struct list_head *tmp, *next;
         ENTRY;
-        if (desc == NULL) {
-                EXIT;
-                return;
-        }
-
-        LASSERT(list_empty(&desc->bd_set_chain));
-
-        if (atomic_read(&desc->bd_refcount) != 0)
-                CERROR("freeing desc %p with refcount %d!\n", desc,
-                       atomic_read(&desc->bd_refcount));
 
+        LASSERT (desc != NULL);
+        LASSERT (desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
+        LASSERT (!desc->bd_network_rw);         /* network hands off or */
+        
         list_for_each_safe(tmp, next, &desc->bd_page_list) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
                 ptlrpc_free_bulk_page(bulk);
         }
 
-        ptlrpc_put_connection(desc->bd_connection);
+        LASSERT (desc->bd_page_count == 0);
+        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+
+        if (desc->bd_export)
+                class_export_put(desc->bd_export);
+        else
+                class_import_put(desc->bd_import);
 
         OBD_FREE(desc, sizeof(*desc));
         EXIT;
@@ -155,168 +212,666 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 
 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
 {
-        ENTRY;
-        if (bulk == NULL) {
-                EXIT;
-                return;
-        }
-
+        LASSERT (bulk != NULL);
+        
         list_del(&bulk->bp_link);
         bulk->bp_desc->bd_page_count--;
         OBD_FREE(bulk, sizeof(*bulk));
-        EXIT;
 }
 
-static int ll_sync_brw_timeout(void *data)
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+                                       int count, int *lengths, char **bufs)
 {
-        struct obd_brw_set *set = data;
-        struct list_head *tmp;
-        int failed = 0;
+        struct ptlrpc_request *request;
+        int rc;
         ENTRY;
 
-        LASSERT(set);
+        LASSERT((unsigned long)imp > 0x1000);
 
-        set->brw_flags |= PTL_RPC_FL_TIMEOUT;
+        OBD_ALLOC(request, sizeof(*request));
+        if (!request) {
+                CERROR("request allocation out of memory\n");
+                RETURN(NULL);
+        }
 
-        list_for_each(tmp, &set->brw_desc_head) {
-                struct ptlrpc_bulk_desc *desc =
-                        list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
+        rc = lustre_pack_msg(count, lengths, bufs,
+                             &request->rq_reqlen, &request->rq_reqmsg);
+        if (rc) {
+                CERROR("cannot pack request %d\n", rc);
+                OBD_FREE(request, sizeof(*request));
+                RETURN(NULL);
+        }
 
-                /* Skip descriptors that were completed successfully. */
-                if (desc->bd_flags & (PTL_BULK_FL_RCVD | PTL_BULK_FL_SENT))
-                        continue;
+        request->rq_timeout = obd_timeout;
+        request->rq_level = LUSTRE_CONN_FULL;
+        request->rq_type = PTL_RPC_MSG_REQUEST;
+        request->rq_import = class_import_get(imp);
+        request->rq_phase = RQ_PHASE_NEW;
+        
+        /* XXX FIXME bug 249 */
+        request->rq_request_portal = imp->imp_client->cli_request_portal;
+        request->rq_reply_portal = imp->imp_client->cli_reply_portal;
 
-                LASSERT(desc->bd_connection);
-
-                /* If PtlMDUnlink succeeds, then bulk I/O on the MD hasn't
-                 * even started yet.  XXX where do we kunmup the thing?
-                 *
-                 * If it fail with PTL_MD_BUSY, then the network is still
-                 * reading/writing the buffers and we must wait for it to
-                 * complete (which it will within finite time, most
-                 * probably with failure; we really need portals error
-                 * events to detect that).
-                 *
-                 * Otherwise (PTL_INV_MD) it completed after the bd_flags
-                 * test above!
-                 */
-                if (PtlMDUnlink(desc->bd_md_h) != PTL_OK) {
-                        CERROR("Near-miss on OST %s -- need to adjust "
-                               "obd_timeout?\n",
-                               desc->bd_connection->c_remote_uuid.uuid);
-                        continue;
-                }
+        request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
 
-                CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n",
-                       desc->bd_page_count,
-                       desc->bd_connection->c_remote_uuid.uuid,
-                       desc->bd_portal, desc->bd_connection);
+        spin_lock_init (&request->rq_lock);
+        INIT_LIST_HEAD(&request->rq_list);
+        init_waitqueue_head(&request->rq_wait_for_rep);
+        request->rq_xid = ptlrpc_next_xid();
+        atomic_set(&request->rq_refcount, 1);
 
-                /* This one will "never" arrive, don't wait for it. */
-                if (atomic_dec_and_test(&set->brw_refcount))
-                        wake_up(&set->brw_waitq);
+        request->rq_reqmsg->opc = opcode;
+        request->rq_reqmsg->flags = 0;
 
-                if (class_signal_connection_failure)
-                        class_signal_connection_failure(desc->bd_connection);
-                else
-                        failed = 1;
+        RETURN(request);
+}
+
+struct ptlrpc_request_set *ptlrpc_prep_set(void)
+{
+        struct ptlrpc_request_set *set;
+
+        OBD_ALLOC(set, sizeof *set);
+        if (!set)
+                RETURN(NULL);
+        INIT_LIST_HEAD(&set->set_requests);
+        init_waitqueue_head(&set->set_waitq);
+        set->set_remaining = 0;
+
+        RETURN(set);
+}
+
+/* Finish with this set; opposite of prep_set. */
+void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
+{
+        struct list_head *tmp;
+        struct list_head *next;
+        int               expected_phase;
+        int               n = 0;
+        ENTRY;
+
+        /* Requests on the set should either all be completed, or all be new */
+        expected_phase = (set->set_remaining == 0) ? 
+                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
+        list_for_each (tmp, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+                LASSERT (req->rq_phase == expected_phase);
+                n++;
+        }
+        
+        LASSERT (set->set_remaining == 0 || set->set_remaining == n);
+        
+        list_for_each_safe(tmp, next, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+                list_del_init(&req->rq_set_chain);
+
+                LASSERT (req->rq_phase == expected_phase);
+
+                if (req->rq_phase == RQ_PHASE_NEW) {
+                        
+                        if (req->rq_interpret_reply != NULL) {
+                                int (*interpreter)(struct ptlrpc_request *, void *, int) =
+                                        req->rq_interpret_reply;
+                                
+                                /* higher level (i.e. LOV) failed; 
+                                 * let the sub reqs clean up */
+                                req->rq_status = -EBADR;
+                                interpreter(req, &req->rq_async_args, req->rq_status);
+                        }
+                        set->set_remaining--;
+                }
+
+                req->rq_set = NULL;
+                ptlrpc_req_finished (req);
         }
 
-        /* 0 = We go back to sleep, until we're resumed or interrupted */
-        /* 1 = We can't be recovered, just abort the syscall with -ETIMEDOUT */
-        RETURN(failed);
+        LASSERT(set->set_remaining == 0);
+
+        OBD_FREE(set, sizeof(*set));
+        EXIT;
 }
 
-static int ll_sync_brw_intr(void *data)
+void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
+                        struct ptlrpc_request *req)
 {
-        struct obd_brw_set *set = data;
+        /* The set takes over the caller's request reference */
+        list_add_tail(&req->rq_set_chain, &set->set_requests);
+        req->rq_set = set;
+        set->set_remaining++;
+}
 
+static int ptlrpc_check_reply(struct ptlrpc_request *req)
+{
+        unsigned long flags;
+        int rc = 0;
         ENTRY;
-        set->brw_flags |= PTL_RPC_FL_INTR;
-        RETURN(1); /* ignored, as of this writing */
+
+        /* serialise with network callback */
+        spin_lock_irqsave (&req->rq_lock, flags);
+
+        if (req->rq_replied) {
+                DEBUG_REQ(D_NET, req, "REPLIED:");
+                GOTO(out, rc = 1);
+        }
+
+        if (req->rq_err) {
+                DEBUG_REQ(D_ERROR, req, "ABORTED:");
+                GOTO(out, rc = 1);
+        }
+
+        if (req->rq_resend) {
+                DEBUG_REQ(D_ERROR, req, "RESEND:");
+                GOTO(out, rc = 1);
+        }
+
+        if (req->rq_restart) {
+                DEBUG_REQ(D_ERROR, req, "RESTART:");
+                GOTO(out, rc = 1);
+        }
+        EXIT;
+ out:
+        spin_unlock_irqrestore (&req->rq_lock, flags);
+        DEBUG_REQ(D_NET, req, "rc = %d for", rc);
+        return rc;
 }
 
-int ll_brw_sync_wait(struct obd_brw_set *set, int phase)
+static int ptlrpc_check_status(struct ptlrpc_request *req)
 {
-        struct l_wait_info lwi;
-        struct list_head *tmp, *next;
-        int rc = 0;
+        int err;
+        ENTRY;
+
+        err = req->rq_repmsg->status;
+        if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
+                DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR (%d)", err);
+                if (err >= 0)
+                        CERROR("Error Reply has >= zero status\n");
+                RETURN(err < 0 ? err : -EINVAL);
+        }
+
+        if (err < 0) {
+                DEBUG_REQ(D_INFO, req, "status is %d", err);
+        } else if (err > 0) {
+                /* XXX: translate this error from net to host */
+                DEBUG_REQ(D_INFO, req, "status is %d", err);
+        }
+
+        RETURN(err);
+}
+
+#warning this needs to change after robert fixes eviction handling
+static int 
+after_reply(struct ptlrpc_request *req, int *restartp)
+{
+        unsigned long flags;
+        struct obd_import *imp = req->rq_import;
+        int rc;
         ENTRY;
 
-        obd_brw_set_addref(set);
-        switch(phase) {
-        case CB_PHASE_START:
-                lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ll_sync_brw_timeout,
-                                       ll_sync_brw_intr, set);
-                rc = l_wait_event(set->brw_waitq,
-                                  atomic_read(&set->brw_desc_count) == 0, &lwi);
-
-                list_for_each_safe(tmp, next, &set->brw_desc_head) {
-                        struct ptlrpc_bulk_desc *desc =
-                                list_entry(tmp, struct ptlrpc_bulk_desc,
-                                           bd_set_chain);
-                        list_del_init(&desc->bd_set_chain);
-                        ptlrpc_bulk_decref(desc);
+        LASSERT (!req->rq_receiving_reply);
+        LASSERT (req->rq_replied);
+
+        if (restartp != NULL)
+                *restartp = 0;
+        
+        /* NB Until this point, the whole of the incoming message,
+         * including buflens, status etc is in the sender's byte order. */
+
+#if SWAB_PARANOIA
+        /* Clear reply swab mask; this is a new reply in sender's byte order */
+        req->rq_rep_swab_mask = 0;
+#endif
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+        if (rc) {
+                CERROR("unpack_rep failed: %d\n", rc);
+                RETURN (-EPROTO);
+        }
+
+        if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
+            req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
+                CERROR("invalid packet type received (type=%u)\n",
+                       req->rq_repmsg->type);
+                RETURN (-EPROTO);
+        }
+
+        /* Store transno in reqmsg for replay. */
+        req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
+
+        rc = ptlrpc_check_status(req);
+
+        /* Either we've been evicted, or the server has failed for
+         * some reason. Try to reconnect, and if that fails, punt to
+         * upcall */
+        if (rc == -ENOTCONN) {
+                if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
+                    imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
+                        RETURN(-ENOTCONN);
                 }
-                break;
-        case CB_PHASE_FINISH:
-                if (atomic_dec_and_test(&set->brw_desc_count))
-                        wake_up(&set->brw_waitq);
-                break;
-        default:
+
+                rc = ptlrpc_request_handle_eviction(req);
+                if (rc)
+                        CERROR("can't reconnect to %s@%s: %d\n", 
+                               imp->imp_target_uuid.uuid,
+                               imp->imp_connection->c_remote_uuid.uuid, rc);
+                else
+                        ptlrpc_wake_delayed(imp);
+
+                if (req->rq_err)
+                        RETURN(-EIO);
+
+                if (req->rq_resend) {
+                        if (restartp == NULL)
+                                LBUG(); /* async resend not supported yet */
+                        spin_lock_irqsave (&req->rq_lock, flags);
+                        req->rq_resend = 0;
+                        spin_unlock_irqrestore (&req->rq_lock, flags);
+                        *restartp = 1;
+                        lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+                        DEBUG_REQ(D_HA, req, "resending: ");
+                        RETURN (0);
+                }
+
+                CERROR("request should be err or resend: %p\n", req);
                 LBUG();
         }
-        obd_brw_set_decref(set);
 
+        if (req->rq_import->imp_replayable) {
+                spin_lock_irqsave(&imp->imp_lock, flags);
+                if ((req->rq_replay || req->rq_transno != 0) && rc >= 0)
+                        ptlrpc_retain_replayable_request(req, imp);
+
+                if (req->rq_transno > imp->imp_max_transno)
+                        imp->imp_max_transno = req->rq_transno;
+
+                /* Replay-enabled imports return commit-status information. */
+                if (req->rq_repmsg->last_committed) {
+                        if (req->rq_repmsg->last_committed < 
+                            imp->imp_peer_committed_transno) {
+                                CERROR("%s went back in time (transno "LPD64
+                                       " was committed, server claims "LPD64
+                                       ")! is shared storage not coherent?\n",
+                                       imp->imp_target_uuid.uuid,
+                                       imp->imp_peer_committed_transno,
+                                       req->rq_repmsg->last_committed);
+                        }
+                        imp->imp_peer_committed_transno =
+                                req->rq_repmsg->last_committed;
+                }
+                ptlrpc_free_committed(imp);
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+        }
+        
         RETURN(rc);
 }
 
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
-                                       int count, int *lengths, char **bufs)
+static int check_set(struct ptlrpc_request_set *set)
 {
-        struct ptlrpc_connection *conn;
-        struct ptlrpc_request *request;
-        int rc;
+        unsigned long flags;
+        struct list_head *tmp;
+        ENTRY;
+
+        if (set->set_remaining == 0)
+                RETURN(1);
+
+        list_for_each(tmp, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+                struct obd_import *imp = req->rq_import;
+                int rc = 0;
+
+                LASSERT (req->rq_phase == RQ_PHASE_RPC ||
+                         req->rq_phase == RQ_PHASE_BULK ||
+                         req->rq_phase == RQ_PHASE_COMPLETE);
+
+                if (req->rq_phase == RQ_PHASE_COMPLETE)
+                        continue;
+
+                if (req->rq_err) {
+                        ptlrpc_unregister_reply(req);
+                        if (req->rq_status == 0)
+                                req->rq_status = -EIO;
+                        req->rq_phase = RQ_PHASE_INTERPRET;
+                
+                        spin_lock_irqsave(&imp->imp_lock, flags);
+                        list_del_init(&req->rq_list);
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+                        GOTO (interpret, req->rq_status);
+                } 
+                
+                if (req->rq_intr) {
+                        /* NB could be on delayed list */
+                        ptlrpc_unregister_reply(req);
+                        req->rq_status = -EINTR;
+                        req->rq_phase = RQ_PHASE_INTERPRET;
+                
+                        spin_lock_irqsave(&imp->imp_lock, flags);
+                        list_del_init(&req->rq_list);
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+                        GOTO (interpret, req->rq_status);
+                }
+                
+                if (req->rq_phase == RQ_PHASE_RPC) {
+                        int do_restart = 0;
+                        if (req->rq_waiting || req->rq_resend) {
+                                spin_lock_irqsave(&imp->imp_lock, flags);
+                                
+                                if (req->rq_level > imp->imp_level) {
+                                        spin_unlock_irqrestore(&imp->imp_lock,
+                                                               flags);
+                                        continue;
+                                }
+                                
+                                list_del(&req->rq_list);
+                                list_add_tail(&req->rq_list,
+                                              &imp->imp_sending_list);
+                                spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+                                req->rq_waiting = 0;
+                                if (req->rq_resend) {
+                                        lustre_msg_add_flags(req->rq_reqmsg,
+                                                             MSG_RESENT);
+                                        spin_lock_irqsave(&req->rq_lock, flags);
+                                        req->rq_resend = 0;
+                                        spin_unlock_irqrestore(&req->rq_lock,
+                                                               flags);
+                                        ptlrpc_unregister_reply(req);
+                                        if (req->rq_bulk) 
+                                                ptlrpc_unregister_bulk(req);
+                               }
+                                
+                                rc = ptl_send_rpc(req);
+                                if (rc) {
+                                        req->rq_status = rc;
+                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        GOTO (interpret, req->rq_status);
+                                }
+                                
+                        }
+                
+                        /* Ensure the network callback returned */
+                        spin_lock_irqsave (&req->rq_lock, flags);
+                        if (!req->rq_replied) {
+                                spin_unlock_irqrestore (&req->rq_lock, flags);
+                                continue;
+                        }
+                        spin_unlock_irqrestore (&req->rq_lock, flags);
+                
+                        spin_lock_irqsave(&imp->imp_lock, flags);
+                        list_del_init(&req->rq_list);
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+                        req->rq_status = after_reply(req, &do_restart);
+                        if (do_restart) {
+                                req->rq_resend = 1; /* ugh */
+                                continue;
+                        }
+                        
+                        if (req->rq_bulk == NULL) {
+                                req->rq_phase = RQ_PHASE_INTERPRET;
+                                GOTO (interpret, req->rq_status);
+                        }
+
+                        req->rq_phase = RQ_PHASE_BULK;
+                }
+
+                LASSERT (req->rq_phase == RQ_PHASE_BULK);
+                if (!ptlrpc_bulk_complete (req->rq_bulk))
+                        continue;
+                
+                req->rq_phase = RQ_PHASE_INTERPRET;
+                
+        interpret:
+                LASSERT (req->rq_phase == RQ_PHASE_INTERPRET);
+                LASSERT (!req->rq_receiving_reply);
+
+                if (req->rq_bulk != NULL)
+                        ptlrpc_unregister_bulk (req);
+                
+                if (req->rq_interpret_reply != NULL) {
+                        int (*interpreter)(struct ptlrpc_request *, void *, int) =
+                                req->rq_interpret_reply;
+                        req->rq_status = interpreter(req, &req->rq_async_args, 
+                                                     req->rq_status);
+                }
+
+                CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
+                       "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+                       imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
+                       req->rq_xid,
+                       imp->imp_connection->c_peer.peer_ni->pni_name,
+                       imp->imp_connection->c_peer.peer_nid,
+                       req->rq_reqmsg->opc);
+
+                req->rq_phase = RQ_PHASE_COMPLETE;
+                set->set_remaining--;
+        }
+
+        RETURN (set->set_remaining == 0);
+}
+
+static int expire_one_request(struct ptlrpc_request *req)
+{
+        unsigned long      flags;
+        struct obd_import *imp = req->rq_import;
+        ENTRY;
+
+        DEBUG_REQ(D_ERROR, req, "timeout");
+
+        spin_lock_irqsave (&req->rq_lock, flags);
+        req->rq_timedout = 1;
+        spin_unlock_irqrestore (&req->rq_lock, flags);
+
+        ptlrpc_unregister_reply (req);
+
+        if (imp == NULL) {
+                DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
+                RETURN(1);
+        }
+
+        /* The DLM server doesn't want recovery run on its imports. */
+        if (imp->imp_dlm_fake)
+                RETURN(1);
+
+        /* If this request is for recovery or other primordial tasks,
+         * don't go back to sleep, and don't start recovery again.. */
+        if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
+            imp->imp_obd->obd_no_recov)
+                RETURN(1);
+
+        ptlrpc_fail_import(imp, req->rq_import_generation);
+
+        RETURN(0);
+}
+
+static int expired_set(void *data)
+{
+        struct ptlrpc_request_set *set = data;
+        struct list_head          *tmp;
+        time_t                     now = LTIME_S (CURRENT_TIME);
         ENTRY;
 
-        LASSERT((unsigned long)imp > 0x1000);
-        conn = imp->imp_connection;
+        LASSERT (set != NULL);
+        CERROR("EXPIRED SET %p\n", set);
+
+        /* A timeout expired; see which reqs it applies to... */
+        list_for_each (tmp, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+                /* request in-flight? */
+                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+                      (req->rq_phase == RQ_PHASE_BULK)))
+                        continue;
+                
+                if (req->rq_timedout ||           /* already dealt with */
+                    req->rq_sent + req->rq_timeout > now) /* not expired */
+                        continue;
+
+                /* deal with this guy */
+                expire_one_request (req);
+        }
+
+        /* When waiting for a whole set, we always to break out of the
+         * sleep so we can recalculate the timeout, or enable interrupts
+         * iff everyone's timed out.
+         */
+        RETURN(1);
+}
+
+static void interrupted_set(void *data)
+{
+        struct ptlrpc_request_set *set = data;
+        struct list_head *tmp;
+        unsigned long flags;
+
+        LASSERT (set != NULL);
+        CERROR("INTERRUPTED SET %p\n", set);
+
+        list_for_each(tmp, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+                if (req->rq_phase != RQ_PHASE_RPC)
+                        continue;
+                
+                spin_lock_irqsave (&req->rq_lock, flags);
+                req->rq_intr = 1;
+                spin_unlock_irqrestore (&req->rq_lock, flags);
+        }
+}
+
+int ptlrpc_set_wait(struct ptlrpc_request_set *set)
+{
+        struct list_head      *tmp;
+        struct obd_import     *imp;
+        struct ptlrpc_request *req;
+        struct l_wait_info     lwi;
+        unsigned long          flags;
+        int                    rc;
+        time_t                 now;
+        time_t                 deadline;
+        int                    timeout;
+        ENTRY;
+
+        list_for_each(tmp, &set->set_requests) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+                LASSERT (req->rq_level == LUSTRE_CONN_FULL);
+                LASSERT (req->rq_phase == RQ_PHASE_NEW);
+                req->rq_phase = RQ_PHASE_RPC;
+                
+                imp = req->rq_import;
+                spin_lock_irqsave(&imp->imp_lock, flags);
+
+                if (imp->imp_invalid) {
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+                        req->rq_status = -EIO;
+                        req->rq_phase = RQ_PHASE_INTERPRET;
+                        continue;
+                }
 
-        OBD_ALLOC(request, sizeof(*request));
-        if (!request) {
-                CERROR("request allocation out of memory\n");
-                RETURN(NULL);
-        }
+                if (req->rq_level > imp->imp_level) {
+                        if (req->rq_no_recov || imp->imp_obd->obd_no_recov ||
+                            imp->imp_dlm_fake) {
+                                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                                req->rq_status = -EWOULDBLOCK;
+                                req->rq_phase = RQ_PHASE_INTERPRET;
+                                continue;
+                        }
+
+                        spin_lock (&req->rq_lock);
+                        req->rq_waiting = 1;
+                        spin_unlock (&req->rq_lock);
+                        LASSERT (list_empty (&req->rq_list));
+                        // list_del(&req->rq_list);
+                        list_add_tail(&req->rq_list, &imp->imp_delayed_list);
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+                        continue;
+                }
 
-        rc = lustre_pack_msg(count, lengths, bufs,
-                             &request->rq_reqlen, &request->rq_reqmsg);
-        if (rc) {
-                CERROR("cannot pack request %d\n", rc);
-                OBD_FREE(request, sizeof(*request));
-                RETURN(NULL);
-        }
+                /* XXX this is the same as ptlrpc_queue_wait */
+                LASSERT(list_empty(&req->rq_list));
+                list_add_tail(&req->rq_list, &imp->imp_sending_list);
+                req->rq_import_generation = imp->imp_generation;
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-        request->rq_timeout = obd_timeout;
-        request->rq_level = LUSTRE_CONN_FULL;
-        request->rq_type = PTL_RPC_MSG_REQUEST;
-        request->rq_import = imp;
+                CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
+                       " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+                       imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
+                       req->rq_xid,
+                       imp->imp_connection->c_peer.peer_ni->pni_name,
+                       imp->imp_connection->c_peer.peer_nid,
+                       req->rq_reqmsg->opc);
 
-        /* XXX FIXME bug 625069, now 249 */
-        request->rq_request_portal = imp->imp_client->cli_request_portal;
-        request->rq_reply_portal = imp->imp_client->cli_reply_portal;
+                rc = ptl_send_rpc(req);
+                if (rc) {
+                        req->rq_status = rc;
+                        req->rq_phase = RQ_PHASE_INTERPRET;
+                }
+        }
 
-        request->rq_connection = ptlrpc_connection_addref(conn);
+        do {
+                now = LTIME_S (CURRENT_TIME);
+                timeout = 0;
+                list_for_each (tmp, &set->set_requests) {
+                        req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
-        INIT_LIST_HEAD(&request->rq_list);
-        atomic_set(&request->rq_refcount, 1);
+                        /* request in-flight? */
+                        if (!((req->rq_phase == RQ_PHASE_RPC &&
+                               !req->rq_waiting) ||
+                              (req->rq_phase == RQ_PHASE_BULK)))
+                                continue;
 
-        request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
-        request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
-        request->rq_reqmsg->opc = HTON__u32(opcode);
-        request->rq_reqmsg->flags = 0;
+                        if (req->rq_timedout)   /* already timed out */
+                                continue;
+                        
+                        deadline = req->rq_sent + req->rq_timeout;
+                        if (deadline <= now)    /* actually expired already */
+                                timeout = 1;    /* ASAP */
+                        else if (timeout == 0 || timeout > deadline - now)
+                                timeout = deadline - now;
+                }
 
-        ptlrpc_hdl2req(request, &imp->imp_handle);
-        RETURN(request);
+                /* wait until all complete, interrupted, or an in-flight
+                 * req times out */
+                CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
+                       set, timeout);
+                lwi = LWI_TIMEOUT_INTR(timeout * HZ, 
+                                       expired_set, interrupted_set, set);
+                rc = l_wait_event(set->set_waitq, check_set(set), &lwi);
+                
+                LASSERT (rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
+
+                /* -EINTR => all requests have been flagged rq_intr so next
+                 * check completes.
+                 * -ETIMEOUTD => someone timed out.  When all reqs have
+                 * timed out, signals are enabled allowing completion with
+                 * EINTR.
+                 * I don't really care if we go once more round the loop in
+                 * the error cases -eeb. */
+        } while (rc != 0);
+
+        LASSERT (set->set_remaining == 0);
+
+        rc = 0;
+        list_for_each(tmp, &set->set_requests) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+                LASSERT (req->rq_phase == RQ_PHASE_COMPLETE);
+                if (req->rq_status != 0)
+                        rc = req->rq_status;
+        }
+        
+        if (set->set_interpret != NULL) {
+                int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
+                        set->set_interpret;
+                rc = interpreter (set, &set->set_args, rc);
+        }
+        
+        RETURN(rc);
 }
 
 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
@@ -327,9 +882,11 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 return;
         }
 
+        LASSERT (!request->rq_receiving_reply);
+        
         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
-        if (request->rq_import) {
+        if (request->rq_import != NULL) {
                 unsigned long flags = 0;
                 if (!locked)
                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
@@ -340,23 +897,29 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         }
 
         if (atomic_read(&request->rq_refcount) != 0) {
-                CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
-                       request, request->rq_reqmsg->opc,
-                       request->rq_connection->c_remote_uuid.uuid,
-                       request->rq_import->imp_client->cli_request_portal,
-                       atomic_read (&request->rq_refcount));
+                DEBUG_REQ(D_ERROR, request,
+                          "freeing request with nonzero refcount");
                 LBUG();
         }
 
         if (request->rq_repmsg != NULL) {
                 OBD_FREE(request->rq_repmsg, request->rq_replen);
                 request->rq_repmsg = NULL;
-                request->rq_reply_md.start = NULL;
         }
         if (request->rq_reqmsg != NULL) {
                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
                 request->rq_reqmsg = NULL;
         }
+        if (request->rq_export != NULL) {
+                class_export_put(request->rq_export);
+                request->rq_export = NULL;
+        }
+        if (request->rq_import != NULL) {
+                class_import_put(request->rq_import);
+                request->rq_import = NULL;
+        }
+        if (request->rq_bulk != NULL)
+                ptlrpc_free_bulk(request->rq_bulk);
 
         ptlrpc_put_connection(request->rq_connection);
         OBD_FREE(request, sizeof(*request));
@@ -396,81 +959,81 @@ void ptlrpc_req_finished(struct ptlrpc_request *request)
         __ptlrpc_req_finished(request, 0);
 }
 
-static int ptlrpc_check_reply(struct ptlrpc_request *req)
+static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
 {
-        int rc = 0;
-
-        ENTRY;
-        if (req->rq_repmsg != NULL) {
-                req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
-                /* Store transno in reqmsg for replay. */
-                req->rq_reqmsg->transno = req->rq_repmsg->transno;
-                req->rq_flags |= PTL_RPC_FL_REPLIED;
-                GOTO(out, rc = 1);
-        }
-
-        if (req->rq_flags & PTL_RPC_FL_RESEND) {
-                DEBUG_REQ(D_ERROR, req, "RESEND:");
-                GOTO(out, rc = 1);
-        }
-
-        if (req->rq_flags & PTL_RPC_FL_ERR) {
-                ENTRY;
-                DEBUG_REQ(D_ERROR, req, "ABORTED:");
-                GOTO(out, rc = 1);
-        }
-
-        if (req->rq_flags & PTL_RPC_FL_RESTART) {
-                DEBUG_REQ(D_ERROR, req, "RESTART:");
-                GOTO(out, rc = 1);
-        }
-        EXIT;
- out:
-        DEBUG_REQ(D_NET, req, "rc = %d for", rc);
-        return rc;
+        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+        request->rq_reqmsg = NULL;
+        request->rq_reqlen = 0;
 }
 
-static int ptlrpc_check_status(struct ptlrpc_request *req)
+/* Disengage the client's reply buffer from the network
+ * NB does _NOT_ unregister any client-side bulk. 
+ * IDEMPOTENT, but _not_ safe against concurrent callers.
+ * The request owner (i.e. the thread doing the I/O) must call...
+ */
+void ptlrpc_unregister_reply (struct ptlrpc_request *request)
 {
-        int err;
+        unsigned long flags;
+        int           rc;
         ENTRY;
 
-        err = req->rq_repmsg->status;
-        if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
-                DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR (%d)", err);
-                RETURN(err ? err : -EINVAL);
-        }
+        LASSERT (!in_interrupt ());             /* might sleep */
 
-        if (err < 0) {
-                DEBUG_REQ(D_INFO, req, "status is %d", err);
-        } else if (err > 0) {
-                /* XXX: translate this error from net to host */
-                DEBUG_REQ(D_INFO, req, "status is %d", err);
+        spin_lock_irqsave (&request->rq_lock, flags);
+        if (!request->rq_receiving_reply) {     /* not waiting for a reply */
+                spin_unlock_irqrestore (&request->rq_lock, flags);
+                EXIT;
+                /* NB reply buffer not freed here */
+                return;
         }
 
-        RETURN(err);
-}
-
-static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
-{
-        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
-        request->rq_reqmsg = NULL;
-        request->rq_reqlen = 0;
-}
-
-/* Abort this request and cleanup any resources associated with it. */
-int ptlrpc_abort(struct ptlrpc_request *request)
-{
-        /* First remove the ME for the reply; in theory, this means
-         * that we can tear down the buffer safely. */
-        if (PtlMEUnlink(request->rq_reply_me_h) != PTL_OK)
-                RETURN(0);
-        OBD_FREE(request->rq_reply_md.start, request->rq_replen);
+        LASSERT (!request->rq_replied);         /* callback hasn't completed */
+        spin_unlock_irqrestore (&request->rq_lock, flags);
+        
+        rc = PtlMDUnlink (request->rq_reply_md_h);
+        switch (rc) {
+        default:
+                LBUG ();
+
+        case PTL_OK:                            /* unlinked before completion */
+                LASSERT (request->rq_receiving_reply);
+                LASSERT (!request->rq_replied);
+                spin_lock_irqsave (&request->rq_lock, flags);
+                request->rq_receiving_reply = 0;
+                spin_unlock_irqrestore (&request->rq_lock, flags);
+                OBD_FREE(request->rq_repmsg, request->rq_replen);
+                request->rq_repmsg = NULL;
+                EXIT;
+                return;
+                
+        case PTL_MD_INUSE:                      /* callback in progress */
+                for (;;) {
+                        /* Network access will complete in finite time but
+                         * the timeout lets us CERROR for visibility */
+                        struct l_wait_info lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
+                        
+                        rc = l_wait_event (request->rq_wait_for_rep,
+                                           request->rq_replied, &lwi);
+                        LASSERT (rc == 0 || rc == -ETIMEDOUT);
+                        if (rc == 0) {
+                                spin_lock_irqsave (&request->rq_lock, flags);
+                                /* Ensure the callback has completed scheduling me 
+                                 * and taken its hands off the request */
+                                spin_unlock_irqrestore (&request->rq_lock, flags);
+                                break;
+                        }
+                        
+                        CERROR ("Unexpectedly long timeout: req %p\n", request);
+                }
+                /* fall through */
 
-        memset(&request->rq_reply_me_h, 0, sizeof(request->rq_reply_me_h));
-        request->rq_reply_md.start = NULL;
-        request->rq_repmsg = NULL;
-        return 0;
+        case PTL_INV_MD:                        /* callback completed */
+                LASSERT (!request->rq_receiving_reply);
+                LASSERT (request->rq_replied);
+                EXIT;
+                return;
+        }
+        /* Not Reached */
 }
 
 /* caller must hold imp->imp_lock */
@@ -478,6 +1041,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
 {
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
+        struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
         ENTRY;
 
         LASSERT(imp != NULL);
@@ -492,7 +1056,11 @@ void ptlrpc_free_committed(struct obd_import *imp)
         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
-                if (req->rq_flags & PTL_RPC_FL_REPLAY) {
+                /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
+                LASSERT (req != last_req);
+                last_req = req;
+
+                if (req->rq_replay) {
                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
                         continue;
                 }
@@ -515,104 +1083,67 @@ void ptlrpc_free_committed(struct obd_import *imp)
 
 void ptlrpc_cleanup_client(struct obd_import *imp)
 {
-        struct list_head *tmp, *saved;
-        struct ptlrpc_request *req;
-        struct ptlrpc_connection *conn = imp->imp_connection;
-        unsigned long flags;
         ENTRY;
-
-        LASSERT(conn);
-
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-
-                /* XXX we should make sure that nobody's sleeping on these! */
-                DEBUG_REQ(D_HA, req, "cleaning up from sending list");
-                list_del_init(&req->rq_list);
-                req->rq_import = NULL;
-                __ptlrpc_req_finished(req, 0);
-        }
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
-
         EXIT;
         return;
 }
 
-void ptlrpc_continue_req(struct ptlrpc_request *req)
-{
-        DEBUG_REQ(D_HA, req, "continuing delayed request");
-        req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
-        req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
-        wake_up(&req->rq_wait_for_rep);
-}
-
 void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
+        unsigned long flags;
+        
         DEBUG_REQ(D_HA, req, "resending");
-        req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
-        req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
+        req->rq_reqmsg->handle.cookie = 0;
+        ptlrpc_put_connection(req->rq_connection);
+        req->rq_connection =
+                ptlrpc_connection_addref(req->rq_import->imp_connection);
         req->rq_status = -EAGAIN;
-        req->rq_level = LUSTRE_CONN_RECOVD;
-        req->rq_flags |= PTL_RPC_FL_RESEND;
-        req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
-        wake_up(&req->rq_wait_for_rep);
+
+        spin_lock_irqsave (&req->rq_lock, flags);
+        req->rq_resend = 1;
+        req->rq_timedout = 0;
+        if (req->rq_set != NULL)
+                wake_up (&req->rq_set->set_waitq);
+        else
+                wake_up(&req->rq_wait_for_rep);
+        spin_unlock_irqrestore (&req->rq_lock, flags);
 }
 
+/* XXX: this function and rq_status are currently unused */
 void ptlrpc_restart_req(struct ptlrpc_request *req)
 {
+        unsigned long flags;
+
         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
         req->rq_status = -ERESTARTSYS;
-        req->rq_flags |= PTL_RPC_FL_RESTART;
-        req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
-        wake_up(&req->rq_wait_for_rep);
+
+        spin_lock_irqsave (&req->rq_lock, flags);
+        req->rq_restart = 1;
+        req->rq_timedout = 0;
+        if (req->rq_set != NULL)
+                wake_up (&req->rq_set->set_waitq);
+        else
+                wake_up(&req->rq_wait_for_rep);
+        spin_unlock_irqrestore (&req->rq_lock, flags);
 }
 
 static int expired_request(void *data)
 {
         struct ptlrpc_request *req = data;
-
         ENTRY;
-        if (!req) {
-                CERROR("NULL req!");
-                LBUG();
-                RETURN(0);
-        }
-
-        DEBUG_REQ(D_ERROR, req, "timeout");
-        ptlrpc_abort(req);
-        req->rq_flags |= PTL_RPC_FL_TIMEOUT;
-
-        if (!req->rq_import) {
-                DEBUG_REQ(D_HA, req, "NULL import; already cleaned up?");
-                RETURN(1);
-        }
-
-        if (!req->rq_import->imp_connection) {
-                DEBUG_REQ(D_ERROR, req, "NULL connection");
-                LBUG();
-                RETURN(0);
-        }
-
-        if (!req->rq_import->imp_connection->c_recovd_data.rd_recovd)
-                RETURN(1);
-
-        recovd_conn_fail(req->rq_import->imp_connection);
 
-        /* If this request is for recovery or other primordial tasks,
-         * don't go back to sleep.
-         */
-        if (req->rq_level < LUSTRE_CONN_FULL)
-                RETURN(1);
-        RETURN(0);
+        RETURN(expire_one_request(req));
 }
 
-static int interrupted_request(void *data)
+static void interrupted_request(void *data)
 {
+        unsigned long flags;
+        
         struct ptlrpc_request *req = data;
-        ENTRY;
-        req->rq_flags |= PTL_RPC_FL_INTR;
-        RETURN(1); /* ignored, as of this writing */
+        DEBUG_REQ(D_HA, req, "request interrupted");
+        spin_lock_irqsave (&req->rq_lock, flags);
+        req->rq_intr = 1;
+        spin_unlock_irqrestore (&req->rq_lock, flags);
 }
 
 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
@@ -631,7 +1162,7 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
         LASSERT(spin_is_locked(&imp->imp_lock));
 #endif
 
-        LASSERT(imp->imp_flags & IMP_REPLAYABLE);
+        LASSERT(imp->imp_replayable);
         /* Balanced in ptlrpc_free_committed, usually. */
         ptlrpc_request_addref(req);
         list_for_each_prev(tmp, &imp->imp_replay_list) {
@@ -642,6 +1173,7 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                  * open a file, or for closes retained if to match creating
                  * opens, so use req->rq_xid as a secondary key.
                  * (See bugs 684, 685, and 428.)
+                 * XXX no longer needed, but all opens need transnos!
                  */
                 if (iter->rq_transno > req->rq_transno)
                         continue;
@@ -662,196 +1194,228 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
+        int brc;
         struct l_wait_info lwi;
         struct obd_import *imp = req->rq_import;
+        struct obd_device *obd = imp->imp_obd;
         struct ptlrpc_connection *conn = imp->imp_connection;
         unsigned int flags;
+        int do_restart = 0;
+        int timeout = 0;
         ENTRY;
 
-        init_waitqueue_head(&req->rq_wait_for_rep);
-
-        req->rq_xid = HTON__u32(ptlrpc_next_xid());
-
+        LASSERT (req->rq_set == NULL);
+        LASSERT (!req->rq_receiving_reply);
+        
         /* for distributed debugging */
-        req->rq_reqmsg->status = HTON__u32(current->pid);
-        CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%s:"LPX64
-               ":%d\n", NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
+        req->rq_reqmsg->status = current->pid;
+        LASSERT(imp->imp_obd != NULL);
+        CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
+               "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+               imp->imp_obd->obd_uuid.uuid,
+               req->rq_reqmsg->status, req->rq_xid,
                conn->c_peer.peer_ni->pni_name, conn->c_peer.peer_nid,
-               NTOH__u32(req->rq_reqmsg->opc));
-
-        spin_lock_irqsave(&imp->imp_lock, flags);
+               req->rq_reqmsg->opc);
 
+        /* Mark phase here for a little debug help */
+        req->rq_phase = RQ_PHASE_RPC;
+        
+restart:
         /*
          * If the import has been invalidated (such as by an OST failure), the
-         * request must fail with -EIO.
+         * request must fail with -EIO.  Recovery requests are allowed to go
+         * through, though, so that they have a chance to revalidate the
+         * import.
          */
-        if (req->rq_import->imp_flags & IMP_INVALID) {
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (req->rq_import->imp_invalid && req->rq_level == LUSTRE_CONN_FULL) {
                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID:");
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
-                RETURN(-EIO);
+                GOTO (out, rc = -EIO);
         }
 
         if (req->rq_level > imp->imp_level) {
                 list_del(&req->rq_list);
+                if (req->rq_no_recov || obd->obd_no_recov ||
+                    imp->imp_dlm_fake) {
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+                        GOTO (out, rc = -EWOULDBLOCK);
+                }
+
                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d < %d)",
+                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
                           current->comm, req->rq_level, imp->imp_level);
                 lwi = LWI_INTR(NULL, NULL);
                 rc = l_wait_event(req->rq_wait_for_rep,
-                                  (req->rq_level <= imp->imp_level) ||
-                                  (req->rq_flags & PTL_RPC_FL_ERR), &lwi);
-
-                if (req->rq_flags & PTL_RPC_FL_ERR)
-                        rc = -EIO;
-
-                if (!req->rq_import)
-                        RETURN(rc);
+                                  (req->rq_level <= imp->imp_level ||
+                                   req->rq_err),
+                                  &lwi);
+                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d)",
+                          current->comm, req->rq_level, imp->imp_level);
 
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 list_del_init(&req->rq_list);
 
+                if (req->rq_err)
+                        rc = -EIO;
+
                 if (rc) {
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
-                        RETURN(rc);
+                        GOTO (out, rc);
                 }
-
+                
                 CERROR("process %d resumed\n", current->pid);
         }
- resend:
 
+        /* XXX this is the same as ptlrpc_set_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
+        req->rq_import_generation = imp->imp_generation;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
+
         rc = ptl_send_rpc(req);
         if (rc) {
-                CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
-                       req->rq_reqmsg->opc);
-                /* sleep for a jiffy, then trigger recovery */
-                lwi = LWI_TIMEOUT_INTR(1, expired_request,
-                                       interrupted_request, req);
+                /* The DLM's fake imports want to avoid all forms of
+                 * recovery. */
+                if (imp->imp_dlm_fake) {
+                        spin_lock_irqsave(&imp->imp_lock, flags);
+                        list_del_init(&req->rq_list);
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+                        GOTO(out, rc);
+                }
+
+                DEBUG_REQ(D_ERROR, req, "send failed (%d); recovering", rc);
+                
+                ptlrpc_fail_import(imp, req->rq_import_generation);
+
+                /* If we've been told to not wait, we're done. */
+                if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
+                    obd->obd_no_recov) {
+                        spin_lock_irqsave(&imp->imp_lock, flags);
+                        list_del_init(&req->rq_list);
+                        spin_unlock_irqrestore(&imp->imp_lock, flags);
+                        GOTO(out, rc);
+                }
+
+                /* If we errored, allow the user to interrupt immediately */
+                timeout = 1;
         } else {
+                timeout = req->rq_timeout * HZ;
                 DEBUG_REQ(D_NET, req, "-- sleeping");
-                lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
-                                       interrupted_request, req);
         }
 #ifdef __KERNEL__
+        lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
+                               req);
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
-#else 
-        { 
+#else
+        {
                 extern int reply_in_callback(ptl_event_t *ev);
                 ptl_event_t reply_ev;
-                PtlEQWait(req->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h, &reply_ev);
-                reply_in_callback(&reply_ev); 
+                PtlEQWait(req->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h,
+                          &reply_ev);
+                reply_in_callback(&reply_ev);
+
+                LASSERT (reply_ev.mem_desc.user_ptr == (void *)req);
+                // ptlrpc_check_reply(req);
+                // not required now it only tests
         }
-#endif 
+#endif
 
         DEBUG_REQ(D_NET, req, "-- done sleeping");
 
+        CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
+               "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+               imp->imp_obd->obd_uuid.uuid,
+               req->rq_reqmsg->status, req->rq_xid,
+               conn->c_peer.peer_ni->pni_name, conn->c_peer.peer_nid,
+               req->rq_reqmsg->opc);
+
         spin_lock_irqsave(&imp->imp_lock, flags);
         list_del_init(&req->rq_list);
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-        if (req->rq_flags & PTL_RPC_FL_ERR) {
-                ptlrpc_abort(req);
+        /* If the reply was received normally, this just grabs the spinlock
+         * (ensuring the reply callback has returned), sees that
+         * req->rq_receiving_reply is clear and returns. */
+        ptlrpc_unregister_reply (req);
+        
+        if (req->rq_err)
                 GOTO(out, rc = -EIO);
-        }
 
-        /* Don't resend if we were interrupted. */
-        if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
-            PTL_RPC_FL_RESEND) {
-                if (req->rq_flags & PTL_RPC_FL_NO_RESEND) {
-                        ptlrpc_abort(req); /* clean up reply buffers */
-                        req->rq_flags &= ~PTL_RPC_FL_NO_RESEND;
+        /* Resend if we need to, unless we were interrupted. */
+        if (req->rq_resend && !req->rq_intr) {
+                /* ...unless we were specifically told otherwise. */
+                if (req->rq_no_resend) {
+                        spin_lock_irqsave (&req->rq_lock, flags);
+                        req->rq_no_resend = 0;
+                        spin_unlock_irqrestore (&req->rq_lock, flags);
                         GOTO(out, rc = -ETIMEDOUT);
                 }
-                req->rq_flags &= ~PTL_RPC_FL_RESEND;
+                spin_lock_irqsave (&req->rq_lock, flags);
+                req->rq_resend = 0;
+                spin_unlock_irqrestore (&req->rq_lock, flags);
                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+
+                if (req->rq_bulk != NULL)
+                        ptlrpc_unregister_bulk (req);
+        
                 DEBUG_REQ(D_HA, req, "resending: ");
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                goto resend;
+                goto restart;
         }
 
-        if (req->rq_flags & PTL_RPC_FL_INTR) {
-                if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
-                        LBUG(); /* should only be interrupted if we timed out */
-                /* Clean up the dangling reply buffers */
-                ptlrpc_abort(req);
+        if (req->rq_intr) {
+                /* Should only be interrupted if we timed out. */
+                if (!req->rq_timedout)
+                        DEBUG_REQ(D_ERROR, req,
+                                  "rq_intr set but rq_timedout not");
                 GOTO(out, rc = -EINTR);
         }
 
-        if (req->rq_flags & PTL_RPC_FL_TIMEOUT)
+        if (req->rq_timedout) {                 /* non-recoverable timeout */
                 GOTO(out, rc = -ETIMEDOUT);
-
-        if (!(req->rq_flags & PTL_RPC_FL_REPLIED))
-                GOTO(out, rc = req->rq_status);
-
-        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
-        if (rc) {
-                CERROR("unpack_rep failed: %d\n", rc);
-                GOTO(out, rc);
         }
-#if 0
-        /* FIXME: Enable when BlueArc makes new release */
-        if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
-            req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
-                CERROR("invalid packet type received (type=%u)\n",
-                       req->rq_repmsg->type);
+        
+        if (!req->rq_replied) {
+                /* How can this be? -eeb */
+                DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
                 LBUG();
-                GOTO(out, rc = -EINVAL);
+                GOTO(out, rc = req->rq_status);
         }
-#endif
-        DEBUG_REQ(D_NET, req, "status %d", req->rq_repmsg->status);
 
-        /* We're a rejected connection, need to invalidate and rebuild. */
-        if (req->rq_repmsg->status == -ENOTCONN) {
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                /* If someone else is reconnecting us (CONN_RECOVD) or has
-                 * already completed it (handle mismatch), then we just need
-                 * to get out.
-                 */
-                if (imp->imp_level == LUSTRE_CONN_RECOVD ||
-                    imp->imp_handle.addr != req->rq_reqmsg->addr ||
-                    imp->imp_handle.cookie != req->rq_reqmsg->cookie) {
-                        spin_unlock_irqrestore(&imp->imp_lock, flags);
-                        GOTO(out, rc = -EIO);
-                }
-                imp->imp_level = LUSTRE_CONN_RECOVD;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-                if (imp->imp_recover != NULL) {
-                        rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
-                        if (rc)
-                                LBUG();
-                }
-                GOTO(out, rc = -EIO);
+        rc = after_reply (req, &do_restart);
+        /* NB may return +ve success rc */
+        if (do_restart) {
+                if (req->rq_bulk != NULL)
+                        ptlrpc_unregister_bulk (req);
+                DEBUG_REQ(D_HA, req, "resending: ");
+                goto restart;
         }
 
-        rc = ptlrpc_check_status(req);
-
-        if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                if ((req->rq_flags & PTL_RPC_FL_REPLAY || req->rq_transno != 0)
-                    && rc >= 0) {
-                        ptlrpc_retain_replayable_request(req, imp);
-                }
-
-                if (req->rq_transno > imp->imp_max_transno) {
-                        imp->imp_max_transno = req->rq_transno;
+ out:
+        if (req->rq_bulk != NULL) {
+                if (rc >= 0) {                  /* success so far */
+                        lwi = LWI_TIMEOUT (timeout, NULL, NULL);
+                        brc = l_wait_event (req->rq_wait_for_rep, 
+                                            ptlrpc_bulk_complete (req->rq_bulk), &lwi);
+                        if (brc != 0) {
+                                LASSERT (brc == -ETIMEDOUT);
+                                CERROR ("Timed out waiting for bulk\n");
+                                rc = brc;
+                        }
                 }
-
-                /* Replay-enabled imports return commit-status information. */
-                if (req->rq_repmsg->last_committed) {
-                        imp->imp_peer_committed_transno =
-                                req->rq_repmsg->last_committed;
+                if (rc < 0) {
+                        /* MDS blocks for put ACKs before replying */
+                        /* OSC sets rq_no_resend for the time being */
+                        LASSERT (req->rq_no_resend);
+                        ptlrpc_unregister_bulk (req);
                 }
-                ptlrpc_free_committed(imp);
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
         }
-
-        EXIT;
- out:
-        return rc;
+        
+        LASSERT (!req->rq_receiving_reply);
+        req->rq_phase = RQ_PHASE_INTERPRET;
+        RETURN (rc);
 }
 
 int ptlrpc_replay_req(struct ptlrpc_request *req)
@@ -861,15 +1425,22 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         struct l_wait_info lwi;
         ENTRY;
 
-        init_waitqueue_head(&req->rq_wait_for_rep);
-        DEBUG_REQ(D_NET, req, "");
+        /* I don't touch rq_phase here, so the debug log can show what
+         * state it was left in */
+        
+        /* Not handling automatic bulk replay yet (or ever?) */
+        LASSERT (req->rq_bulk == NULL);
+        
+        DEBUG_REQ(D_NET, req, "about to replay");
 
-        req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
-        req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
+        /* Update request's state, since we might have a new connection. */
+        ptlrpc_put_connection(req->rq_connection);
+        req->rq_connection =
+                ptlrpc_connection_addref(req->rq_import->imp_connection);
 
         /* temporarily set request to RECOVD level (reset at out:) */
         old_level = req->rq_level;
-        if (req->rq_flags & PTL_RPC_FL_REPLIED)
+        if (req->rq_replied)
                 old_status = req->rq_repmsg->status;
         req->rq_level = LUSTRE_CONN_RECOVD;
         rc = ptl_send_rpc(req);
@@ -887,18 +1458,40 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 
         // up(&cli->cli_rpc_sem);
 
-        if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
+        /* If the reply was received normally, this just grabs the spinlock
+         * (ensuring the reply callback has returned), sees that
+         * req->rq_receiving_reply is clear and returns. */
+        ptlrpc_unregister_reply (req);
+
+        if (!req->rq_replied) {
                 CERROR("Unknown reason for wakeup\n");
                 /* XXX Phil - I end up here when I kill obdctl */
-                ptlrpc_abort(req);
+                /* ...that's because signals aren't all masked in
+                 * l_wait_event() -eeb */
                 GOTO(out, rc = -EINTR);
         }
 
+#if SWAB_PARANOIA
+        /* Clear reply swab mask; this is a new reply in sender's byte order */
+        req->rq_rep_swab_mask = 0;
+#endif
         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
-                GOTO(out, rc);
+                GOTO(out, rc = -EPROTO);
         }
+#if 0
+        /* FIXME: Enable when BlueArc makes new release */
+        if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
+            req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
+                CERROR("invalid packet type received (type=%u)\n",
+                       req->rq_repmsg->type);
+                GOTO(out, rc = -EPROTO);
+        }
+#endif
+
+        /* The transno had better not change over replay. */
+        LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
 
         CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
 
@@ -906,8 +1499,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         if (req->rq_replay_cb)
                 req->rq_replay_cb(req);
 
-        if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
-            req->rq_repmsg->status != old_status) {
+        if (req->rq_replied && req->rq_repmsg->status != old_status) {
                 DEBUG_REQ(D_HA, req, "status %d, old was %d",
                           req->rq_repmsg->status, old_status);
         }
@@ -917,32 +1509,42 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
-/* XXX looks a lot like super.c:invalidate_request_list, don't it? */
-void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
+void ptlrpc_abort_inflight(struct obd_import *imp)
 {
         unsigned long flags;
         struct list_head *tmp, *n;
         ENTRY;
 
         /* Make sure that no new requests get processed for this import.
-         * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
-         * flag and then putting requests on sending_list or delayed_list.
+         * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
+         * this flag and then putting requests on sending_list or delayed_list.
+         */
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (!imp->imp_replayable)
+                /* on b_devel, I moved this line to
+                   ptlrpc_set_import_active because I thought it made
+                   more sense there and possibly not all callers of
+                   this function expect this. I'll leave it here until
+                   I can figure out if it's correct or not. - rread 5/12/03  */
+                imp->imp_invalid = 1;
+
+        /* XXX locking?  Maybe we should remove each request with the list
+         * locked?  Also, how do we know if the requests on the list are
+         * being freed at this time?
          */
-        if ((imp->imp_flags & IMP_REPLAYABLE) == 0) {
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                imp->imp_flags |= IMP_INVALID;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-        }
-
         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 DEBUG_REQ(D_HA, req, "inflight");
-                req->rq_flags |= PTL_RPC_FL_ERR;
-                if (dying_import)
-                        req->rq_import = NULL;
-                wake_up(&req->rq_wait_for_rep);
+
+                spin_lock (&req->rq_lock);
+                req->rq_err = 1;
+                if (req->rq_set != NULL)
+                        wake_up(&req->rq_set->set_waitq);
+                else
+                        wake_up(&req->rq_wait_for_rep);
+                spin_unlock (&req->rq_lock);
         }
 
         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
@@ -950,10 +1552,36 @@ void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 DEBUG_REQ(D_HA, req, "aborting waiting req");
-                req->rq_flags |= PTL_RPC_FL_ERR;
-                if (dying_import)
-                        req->rq_import = NULL;
-                wake_up(&req->rq_wait_for_rep);
+
+                spin_lock (&req->rq_lock);
+                req->rq_err = 1;
+                if (req->rq_set != NULL)
+                        wake_up(&req->rq_set->set_waitq);
+                else
+                        wake_up(&req->rq_wait_for_rep);
+                spin_unlock (&req->rq_lock);
         }
+
+        /* Last chance to free reqs left on the replay list, but we
+         * will still leak reqs that haven't comitted.  */
+        if (imp->imp_replayable)
+                ptlrpc_free_committed(imp);
+
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
         EXIT;
 }
+
+static __u64 ptlrpc_last_xid = 0;
+static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
+
+__u64 ptlrpc_next_xid(void)
+{
+        __u64 tmp;
+        spin_lock(&ptlrpc_last_xid_lock);
+        tmp = ++ptlrpc_last_xid;
+        spin_unlock(&ptlrpc_last_xid_lock);
+        return tmp;
+}
+
+