Whamcloud - gitweb
LU-12567 ptlrpc: handle reply and resend reorder
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 2523468..04fe734 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 /** Implementation of client-side PortalRPC interfaces */
@@ -37,6 +36,7 @@
 #include <linux/delay.h>
 #include <linux/random.h>
 
+#include <lnet/lib-lnet.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_lib.h>
@@ -65,7 +65,29 @@ static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
        int i;
 
        for (i = 0; i < desc->bd_iov_count ; i++)
-               put_page(BD_GET_KIOV(desc, i).kiov_page);
+               put_page(desc->bd_vec[i].bv_page);
+}
+
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+                                      void *frag, int len)
+{
+       unsigned int offset = (unsigned long)frag & ~PAGE_MASK;
+
+       ENTRY;
+       while (len > 0) {
+               int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+                                    len);
+               unsigned long vaddr = (unsigned long)frag;
+
+               ptlrpc_prep_bulk_page_nopin(desc,
+                                           lnet_kvaddr_to_page(vaddr),
+                                           offset, page_len);
+               offset = 0;
+               len -= page_len;
+               frag += page_len;
+       }
+
+       RETURN(desc->bd_nob);
 }
 
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
@@ -77,14 +99,10 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_nopin,
        .release_frags  = ptlrpc_release_bulk_noop,
+       .add_iov_frag   = ptlrpc_prep_bulk_frag_pages,
 };
 EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
 
-const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = {
-       .add_iov_frag = ptlrpc_prep_bulk_frag,
-};
-EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops);
-
 static int ptlrpc_send_new_req(struct ptlrpc_request *req);
 static int ptlrpcd_check_work(struct ptlrpc_request *req);
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
@@ -148,26 +166,22 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
        struct ptlrpc_bulk_desc *desc;
        int i;
 
-       /* ensure that only one of KIOV or IOVEC is set but not both */
-       LASSERT((ptlrpc_is_bulk_desc_kiov(type) &&
-                ops->add_kiov_frag != NULL) ||
-               (ptlrpc_is_bulk_desc_kvec(type) &&
-                ops->add_iov_frag != NULL));
+       LASSERT(ops->add_kiov_frag != NULL);
+
+       if (max_brw > PTLRPC_BULK_OPS_COUNT)
+               RETURN(NULL);
+
+       if (nfrags > LNET_MAX_IOV * max_brw)
+               RETURN(NULL);
 
        OBD_ALLOC_PTR(desc);
        if (!desc)
                return NULL;
-       if (type & PTLRPC_BULK_BUF_KIOV) {
-               OBD_ALLOC_LARGE(GET_KIOV(desc),
-                               nfrags * sizeof(*GET_KIOV(desc)));
-               if (!GET_KIOV(desc))
-                       goto out;
-       } else {
-               OBD_ALLOC_LARGE(GET_KVEC(desc),
-                               nfrags * sizeof(*GET_KVEC(desc)));
-               if (!GET_KVEC(desc))
-                       goto out;
-       }
+
+       OBD_ALLOC_LARGE(desc->bd_vec,
+                       nfrags * sizeof(*desc->bd_vec));
+       if (!desc->bd_vec)
+               goto out;
 
        spin_lock_init(&desc->bd_lock);
        init_waitqueue_head(&desc->bd_waitq);
@@ -176,6 +190,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
+       desc->bd_nob_last = LNET_MTU;
        desc->bd_frag_ops = ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
@@ -217,7 +232,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
        if (!desc)
                RETURN(NULL);
 
-       desc->bd_import_generation = req->rq_import_generation;
        desc->bd_import = class_import_get(imp);
        desc->bd_req = req;
 
@@ -235,67 +249,49 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
                             struct page *page, int pageoffset, int len,
                             int pin)
 {
-       lnet_kiov_t *kiov;
+       struct bio_vec *kiov;
 
        LASSERT(desc->bd_iov_count < desc->bd_max_iov);
        LASSERT(page != NULL);
        LASSERT(pageoffset >= 0);
        LASSERT(len > 0);
        LASSERT(pageoffset + len <= PAGE_SIZE);
-       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
-       kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
+       kiov = &desc->bd_vec[desc->bd_iov_count];
 
+       if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+            ((desc->bd_nob_last + len) > LNET_MTU)) {
+               desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+               desc->bd_md_count++;
+               desc->bd_nob_last = 0;
+               LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+       }
+
+       desc->bd_nob_last += len;
        desc->bd_nob += len;
 
        if (pin)
                get_page(page);
 
-       kiov->kiov_page = page;
-       kiov->kiov_offset = pageoffset;
-       kiov->kiov_len = len;
+       kiov->bv_page = page;
+       kiov->bv_offset = pageoffset;
+       kiov->bv_len = len;
 
        desc->bd_iov_count++;
 }
 EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
 
-int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
-                         void *frag, int len)
-{
-       struct kvec *iovec;
-
-       ENTRY;
-
-       LASSERT(desc->bd_iov_count < desc->bd_max_iov);
-       LASSERT(frag != NULL);
-       LASSERT(len > 0);
-       LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
-
-       iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
-
-       desc->bd_nob += len;
-
-       iovec->iov_base = frag;
-       iovec->iov_len = len;
-
-       desc->bd_iov_count++;
-
-       RETURN(desc->bd_nob);
-}
-EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
-
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 {
        ENTRY;
 
        LASSERT(desc != NULL);
        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
-       LASSERT(desc->bd_md_count == 0);         /* network hands off */
+       LASSERT(desc->bd_refs == 0);         /* network hands off */
        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
        LASSERT(desc->bd_frag_ops != NULL);
 
-       if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
-               sptlrpc_enc_pool_put_pages(desc);
+       sptlrpc_enc_pool_put_pages(desc);
 
        if (desc->bd_export)
                class_export_put(desc->bd_export);
@@ -305,12 +301,8 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
        if (desc->bd_frag_ops->release_frags != NULL)
                desc->bd_frag_ops->release_frags(desc);
 
-       if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
-               OBD_FREE_LARGE(GET_KIOV(desc),
-                              desc->bd_max_iov * sizeof(*GET_KIOV(desc)));
-       else
-               OBD_FREE_LARGE(GET_KVEC(desc),
-                              desc->bd_max_iov * sizeof(*GET_KVEC(desc)));
+       OBD_FREE_LARGE(desc->bd_vec,
+                      desc->bd_max_iov * sizeof(*desc->bd_vec));
        OBD_FREE_PTR(desc);
        EXIT;
 }
@@ -322,10 +314,6 @@ EXPORT_SYMBOL(ptlrpc_free_bulk);
  */
 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
 {
-       __u32 serv_est;
-       int idx;
-       struct imp_at *at;
-
        LASSERT(req->rq_import);
 
        if (AT_OFF) {
@@ -340,18 +328,25 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
                req->rq_timeout = req->rq_import->imp_server_timeout ?
                                  obd_timeout / 2 : obd_timeout;
        } else {
-               at = &req->rq_import->imp_at;
+               struct imp_at *at = &req->rq_import->imp_at;
+               timeout_t serv_est;
+               int idx;
+
                idx = import_at_get_index(req->rq_import,
                                          req->rq_request_portal);
                serv_est = at_get(&at->iat_service_estimate[idx]);
+               /*
+                * Currently a 32 bit value is sent over the
+                * wire for rq_timeout so please don't change this
+                * to time64_t. The work for LU-1158 will in time
+                * replace rq_timeout with a 64 bit nanosecond value
+                */
                req->rq_timeout = at_est2timeout(serv_est);
        }
        /*
         * We could get even fancier here, using history to predict increased
         * loading...
-        */
-
-       /*
+        *
         * Let the server know what this RPC timeout is by putting it in the
         * reqmsg
         */
@@ -361,10 +356,10 @@ EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 
 /* Adjust max service estimate based on server value */
 static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
-                                 unsigned int serv_est)
+                                 timeout_t serv_est)
 {
        int idx;
-       unsigned int oldse;
+       timeout_t oldse;
        struct imp_at *at;
 
        LASSERT(req->rq_import);
@@ -392,15 +387,16 @@ int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
 
 /* Adjust expected network latency */
 void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
-                              unsigned int service_time)
+                              timeout_t service_timeout)
 {
-       unsigned int nl, oldnl;
-       struct imp_at *at;
        time64_t now = ktime_get_real_seconds();
+       struct imp_at *at;
+       timeout_t oldnl;
+       timeout_t nl;
 
        LASSERT(req->rq_import);
 
-       if (service_time > now - req->rq_sent + 3) {
+       if (service_timeout > now - req->rq_sent + 3) {
                /*
                 * b=16408, however, this can also happen if early reply
                 * is lost and client RPC is expired and resent, early reply
@@ -409,16 +405,17 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
                 * resent time, but server sent back service time of original
                 * RPC.
                 */
-               CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
-                      D_ADAPTTO : D_WARNING,
-                      "Reported service time %u > total measured time %lld\n",
-                      service_time, now - req->rq_sent);
+               CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) &
+                             MSG_RESENT) ?  D_ADAPTTO : D_WARNING,
+                            "Reported service time %u > total measured time %lld\n",
+                            service_timeout, now - req->rq_sent);
                return;
        }
 
-       /* Network latency is total time less server processing time */
-       nl = max_t(int, now - req->rq_sent -
-                       service_time, 0) + 1; /* st rounding */
+       /* Network latency is total time less server processing time,
+        * st rounding
+        */
+       nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1;
        at = &req->rq_import->imp_at;
 
        oldnl = at_measured(&at->iat_net_latency, nl);
@@ -437,14 +434,16 @@ static int unpack_reply(struct ptlrpc_request *req)
        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
                rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
                if (rc) {
-                       DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
+                       DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d",
+                                 rc);
                        return -EPROTO;
                }
        }
 
        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
        if (rc) {
-               DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
+               DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d",
+                         rc);
                return -EPROTO;
        }
        return 0;
@@ -458,6 +457,7 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
 __must_hold(&req->rq_lock)
 {
        struct ptlrpc_request *early_req;
+       timeout_t service_timeout;
        time64_t olddl;
        int rc;
 
@@ -489,8 +489,8 @@ __must_hold(&req->rq_lock)
        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
 
        /* Network latency can be adjusted, it is pure network delays */
-       ptlrpc_at_adj_net_latency(req,
-                                 lustre_msg_get_service_time(early_req->rq_repmsg));
+       service_timeout = lustre_msg_get_service_timeout(early_req->rq_repmsg);
+       ptlrpc_at_adj_net_latency(req, service_timeout);
 
        sptlrpc_cli_finish_early_reply(early_req);
 
@@ -505,6 +505,8 @@ __must_hold(&req->rq_lock)
        req->rq_deadline = req->rq_sent + req->rq_timeout +
                           ptlrpc_at_get_net_latency(req);
 
+       /* The below message is checked in replay-single.sh test_65{a,b} */
+       /* The below message is checked in sanity-{gss,krb5} test_8 */
        DEBUG_REQ(D_ADAPTTO, req,
                  "Early reply #%d, new deadline in %llds (%llds)",
                  req->rq_early_count,
@@ -548,14 +550,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req)
  */
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
 {
-       struct list_head *l, *tmp;
        struct ptlrpc_request *req;
 
        LASSERT(pool != NULL);
 
        spin_lock(&pool->prp_lock);
-       list_for_each_safe(l, tmp, &pool->prp_req_list) {
-               req = list_entry(l, struct ptlrpc_request, rq_list);
+       while ((req = list_first_entry_or_null(&pool->prp_req_list,
+                                              struct ptlrpc_request,
+                                              rq_list))) {
                list_del(&req->rq_list);
                LASSERT(req->rq_reqbuf);
                LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
@@ -583,13 +585,11 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
                 "Trying to change pool size with nonempty pool from %d to %d bytes\n",
                 pool->prp_rq_size, size);
 
-       spin_lock(&pool->prp_lock);
        pool->prp_rq_size = size;
        for (i = 0; i < num_rq; i++) {
                struct ptlrpc_request *req;
                struct lustre_msg *msg;
 
-               spin_unlock(&pool->prp_lock);
                req = ptlrpc_request_cache_alloc(GFP_NOFS);
                if (!req)
                        return i;
@@ -603,8 +603,8 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
                req->rq_pool = pool;
                spin_lock(&pool->prp_lock);
                list_add_tail(&req->rq_list, &pool->prp_req_list);
+               spin_unlock(&pool->prp_lock);
        }
-       spin_unlock(&pool->prp_lock);
        return num_rq;
 }
 EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
@@ -623,7 +623,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize,
 {
        struct ptlrpc_request_pool *pool;
 
-       OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+       OBD_ALLOC_PTR(pool);
        if (!pool)
                return NULL;
 
@@ -667,8 +667,8 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
                return NULL;
        }
 
-       request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
-                            rq_list);
+       request = list_first_entry(&pool->prp_req_list, struct ptlrpc_request,
+                                  rq_list);
        list_del_init(&request->rq_list);
        spin_unlock(&pool->prp_lock);
 
@@ -701,17 +701,14 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
 void ptlrpc_add_unreplied(struct ptlrpc_request *req)
 {
        struct obd_import *imp = req->rq_import;
-       struct list_head *tmp;
        struct ptlrpc_request *iter;
 
        assert_spin_locked(&imp->imp_lock);
        LASSERT(list_empty(&req->rq_unreplied_list));
 
        /* unreplied list is sorted by xid in ascending order */
-       list_for_each_prev(tmp, &imp->imp_unreplied_list) {
-               iter = list_entry(tmp, struct ptlrpc_request,
-                                 rq_unreplied_list);
-
+       list_for_each_entry_reverse(iter, &imp->imp_unreplied_list,
+                                   rq_unreplied_list) {
                LASSERT(req->rq_xid != iter->rq_xid);
                if (req->rq_xid < iter->rq_xid)
                        continue;
@@ -734,8 +731,42 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
        spin_unlock(&req->rq_import->imp_lock);
 }
 
-static __u64 ptlrpc_last_xid;
-static spinlock_t ptlrpc_last_xid_lock;
+static atomic64_t ptlrpc_last_xid;
+
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       list_del_init(&req->rq_unreplied_list);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+       DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = obd_get_mod_rpc_slot(cli, opc);
+       lustre_msg_set_tag(req->rq_reqmsg, tag);
+       ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (tag != 0) {
+               struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+               __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+               obd_put_mod_rpc_slot(cli, opc, tag);
+       }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
 
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                             __u32 version, int opcode, char **bufs,
@@ -804,17 +835,18 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                        fail2_t = &request->rq_bulk_deadline;
                } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) {
                        time64_t now = ktime_get_real_seconds();
-                       spin_lock(&ptlrpc_last_xid_lock);
-                       ptlrpc_last_xid = ((__u64)now >> 4) << 24;
-                       spin_unlock(&ptlrpc_last_xid_lock);
+                       u64 xid = ((u64)now >> 4) << 24;
+
+                       atomic64_set(&ptlrpc_last_xid, xid);
                }
 
                if (fail_t) {
-                       *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+                       *fail_t = ktime_get_real_seconds() +
+                                 PTLRPC_REQ_LONG_UNLINK;
 
                        if (fail2_t)
                                *fail2_t = ktime_get_real_seconds() +
-                                          LONG_UNLINK;
+                                          PTLRPC_REQ_LONG_UNLINK;
 
                        /*
                         * The RPC is infected, let the test to change the
@@ -831,6 +863,7 @@ out_ctx:
        LASSERT(!request->rq_pool);
        sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
 out_free:
+       atomic_dec(&imp->imp_reqs);
        class_import_put(imp);
 
        return rc;
@@ -844,32 +877,7 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
 int ptlrpc_request_pack(struct ptlrpc_request *request,
                        __u32 version, int opcode)
 {
-       int rc;
-
-       rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
-       if (rc)
-               return rc;
-
-       /*
-        * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
-        * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
-        * have to send old ptlrpc_body to keep interoprability with these
-        * clients.
-        *
-        * Only three kinds of server->client RPCs so far:
-        *  - LDLM_BL_CALLBACK
-        *  - LDLM_CP_CALLBACK
-        *  - LDLM_GL_CALLBACK
-        *
-        * XXX This should be removed whenever we drop the interoprability with
-        *     the these old clients.
-        */
-       if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
-           opcode == LDLM_GL_CALLBACK)
-               req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
-                                  sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
-
-       return rc;
+       return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
 }
 EXPORT_SYMBOL(ptlrpc_request_pack);
 
@@ -893,13 +901,14 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
        if (request) {
                ptlrpc_cli_req_init(request);
 
-               LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+               LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
                LASSERT(imp != LP_POISON);
                LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
                         imp->imp_client);
                LASSERT(imp->imp_client != LP_POISON);
 
                request->rq_import = class_import_get(imp);
+               atomic_inc(&imp->imp_reqs);
        } else {
                CERROR("request allocation out of memory\n");
        }
@@ -907,6 +916,33 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
        return request;
 }
 
+static int ptlrpc_reconnect_if_idle(struct obd_import *imp)
+{
+       int rc;
+
+       /*
+        * initiate connection if needed when the import has been
+        * referenced by the new request to avoid races with disconnect.
+        * serialize this check against conditional state=IDLE
+        * in ptlrpc_disconnect_idle_interpret()
+        */
+       spin_lock(&imp->imp_lock);
+       if (imp->imp_state == LUSTRE_IMP_IDLE) {
+               imp->imp_generation++;
+               imp->imp_initiated_at = imp->imp_generation;
+               imp->imp_state = LUSTRE_IMP_NEW;
+
+               /* connect_import_locked releases imp_lock */
+               rc = ptlrpc_connect_import_locked(imp);
+               if (rc)
+                       return rc;
+               ptlrpc_pinger_add_import(imp);
+       } else {
+               spin_unlock(&imp->imp_lock);
+       }
+       return 0;
+}
+
 /**
  * Helper function for creating a request.
  * Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
@@ -919,38 +955,18 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
                              const struct req_format *format)
 {
        struct ptlrpc_request *request;
-       int connect = 0;
 
        request = __ptlrpc_request_alloc(imp, pool);
        if (!request)
                return NULL;
 
-       /*
-        * initiate connection if needed when the import has been
-        * referenced by the new request to avoid races with disconnect
-        */
-       if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
-               int rc;
-
-               CDEBUG_LIMIT(imp->imp_idle_debug,
-                            "%s: reconnect after %llds idle\n",
-                            imp->imp_obd->obd_name, ktime_get_real_seconds() -
-                                                    imp->imp_last_reply_time);
-               spin_lock(&imp->imp_lock);
-               if (imp->imp_state == LUSTRE_IMP_IDLE) {
-                       imp->imp_generation++;
-                       imp->imp_initiated_at = imp->imp_generation;
-                       imp->imp_state =  LUSTRE_IMP_NEW;
-                       connect = 1;
-               }
-               spin_unlock(&imp->imp_lock);
-               if (connect) {
-                       rc = ptlrpc_connect_import(imp);
-                       if (rc < 0) {
-                               ptlrpc_request_free(request);
-                               return NULL;
-                       }
-                       ptlrpc_pinger_add_import(imp);
+       /* don't make expensive check for idling connection
+        * if it's already connected */
+       if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) {
+               if (ptlrpc_reconnect_if_idle(imp) < 0) {
+                       atomic_dec(&imp->imp_reqs);
+                       ptlrpc_request_free(request);
+                       return NULL;
                }
        }
 
@@ -1031,8 +1047,8 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
        int cpt;
 
        ENTRY;
-       cpt = cfs_cpt_current(cfs_cpt_table, 0);
-       OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set));
+       cpt = cfs_cpt_current(cfs_cpt_tab, 0);
+       OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set));
        if (!set)
                RETURN(NULL);
        atomic_set(&set->set_refcount, 1);
@@ -1086,8 +1102,7 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
  */
 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
-       struct list_head *next;
+       struct ptlrpc_request *req;
        int expected_phase;
        int n = 0;
 
@@ -1096,11 +1111,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
        /* Requests on the set should either all be completed, or all be new */
        expected_phase = (atomic_read(&set->set_remaining) == 0) ?
                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                LASSERT(req->rq_phase == expected_phase);
                n++;
        }
@@ -1109,10 +1120,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 atomic_read(&set->set_remaining) == n, "%d / %d\n",
                 atomic_read(&set->set_remaining), n);
 
-       list_for_each_safe(tmp, next, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
+       while ((req = list_first_entry_or_null(&set->set_requests,
+                                              struct ptlrpc_request,
+                                              rq_set_chain))) {
                list_del_init(&req->rq_set_chain);
 
                LASSERT(req->rq_phase == expected_phase);
@@ -1144,6 +1154,11 @@ EXPORT_SYMBOL(ptlrpc_set_destroy);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                        struct ptlrpc_request *req)
 {
+       if (set == PTLRPCD_SET) {
+               ptlrpcd_add_req(req);
+               return;
+       }
+
        LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE);
        LASSERT(list_empty(&req->rq_set_chain));
 
@@ -1228,7 +1243,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
        if (req->rq_ctx_init || req->rq_ctx_fini) {
                /* always allow ctx init/fini rpc go through */
        } else if (imp->imp_state == LUSTRE_IMP_NEW) {
-               DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
+               DEBUG_REQ(D_ERROR, req, "Uninitialized import");
                *status = -EIO;
        } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
                unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
@@ -1238,11 +1253,11 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                 * race with umount
                 */
                DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
-                         D_HA : D_ERROR, req, "IMP_CLOSED ");
+                         D_HA : D_ERROR, req, "IMP_CLOSED");
                *status = -EIO;
        } else if (ptlrpc_send_limit_expired(req)) {
                /* probably doesn't need to be a D_ERROR afterinitial testing */
-               DEBUG_REQ(D_HA, req, "send limit expired ");
+               DEBUG_REQ(D_HA, req, "send limit expired");
                *status = -ETIMEDOUT;
        } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
                   imp->imp_state == LUSTRE_IMP_CONNECTING) {
@@ -1266,13 +1281,13 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                } else if (req->rq_no_delay &&
                           imp->imp_generation != imp->imp_initiated_at) {
                        /* ignore nodelay for requests initiating connections */
-                       *status = -EWOULDBLOCK;
+                       *status = -EAGAIN;
                } else if (req->rq_allow_replay &&
                           (imp->imp_state == LUSTRE_IMP_REPLAY ||
                            imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
                            imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
                            imp->imp_state == LUSTRE_IMP_RECOVER)) {
-                       DEBUG_REQ(D_HA, req, "allow during recovery.\n");
+                       DEBUG_REQ(D_HA, req, "allow during recovery");
                } else {
                        delay = 1;
                }
@@ -1329,32 +1344,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err)
  */
 static int ptlrpc_check_status(struct ptlrpc_request *req)
 {
-       int err;
+       int rc;
 
        ENTRY;
-       err = lustre_msg_get_status(req->rq_repmsg);
+       rc = lustre_msg_get_status(req->rq_repmsg);
        if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
                struct obd_import *imp = req->rq_import;
                lnet_nid_t nid = imp->imp_connection->c_peer.nid;
                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
 
-               if (ptlrpc_console_allow(req, opc, err))
+               if (ptlrpc_console_allow(req, opc, rc))
                        LCONSOLE_ERROR_MSG(0x11,
                                           "%s: operation %s to node %s failed: rc = %d\n",
                                           imp->imp_obd->obd_name,
                                           ll_opcode2str(opc),
-                                          libcfs_nid2str(nid), err);
-               RETURN(err < 0 ? err : -EINVAL);
+                                          libcfs_nid2str(nid), rc);
+               RETURN(rc < 0 ? rc : -EINVAL);
        }
 
-       if (err < 0) {
-               DEBUG_REQ(D_INFO, req, "status is %d", err);
-       } else if (err > 0) {
-               /* XXX: translate this error from net to host */
-               DEBUG_REQ(D_INFO, req, "status is %d", err);
-       }
+       if (rc)
+               DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc);
 
-       RETURN(err);
+       RETURN(rc);
 }
 
 /**
@@ -1388,8 +1399,8 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
        if (list_empty(&imp->imp_unreplied_list))
                return 0;
 
-       req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
-                        rq_unreplied_list);
+       req = list_first_entry(&imp->imp_unreplied_list, struct ptlrpc_request,
+                              rq_unreplied_list);
        LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
 
        if (imp->imp_known_replied_xid < req->rq_xid - 1)
@@ -1422,7 +1433,7 @@ static int after_reply(struct ptlrpc_request *req)
        if (req->rq_reply_truncated) {
                if (ptlrpc_no_resend(req)) {
                        DEBUG_REQ(D_ERROR, req,
-                                 "reply buffer overflow, expected: %d, actual size: %d",
+                                 "reply buffer overflow, expected=%d, actual size=%d",
                                  req->rq_nob_received, req->rq_repbuf_len);
                        RETURN(-EOVERFLOW);
                }
@@ -1451,7 +1462,7 @@ static int after_reply(struct ptlrpc_request *req)
         */
        rc = sptlrpc_cli_unwrap_reply(req);
        if (rc) {
-               DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
+               DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc);
                RETURN(rc);
        }
 
@@ -1470,7 +1481,8 @@ static int after_reply(struct ptlrpc_request *req)
            ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
                time64_t now = ktime_get_real_seconds();
 
-               DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
+               DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) |
+                         D_RPCTRACE, req, "resending request on EINPROGRESS");
                spin_lock(&req->rq_lock);
                req->rq_resend = 1;
                spin_unlock(&req->rq_lock);
@@ -1514,7 +1526,7 @@ static int after_reply(struct ptlrpc_request *req)
                CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
        ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
        ptlrpc_at_adj_net_latency(req,
-                                 lustre_msg_get_service_time(req->rq_repmsg));
+                                 lustre_msg_get_service_timeout(req->rq_repmsg));
 
        rc = ptlrpc_check_status(req);
 
@@ -1682,9 +1694,24 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
 
-       lustre_msg_set_status(req->rq_reqmsg, current_pid());
-
-       rc = sptlrpc_req_refresh_ctx(req, -1);
+       lustre_msg_set_status(req->rq_reqmsg, current->pid);
+
+       /* If the request to be sent is an LDLM callback, do not try to
+        * refresh context.
+        * An LDLM callback is sent by a server to a client in order to make
+        * it release a lock, on a communication channel that uses a reverse
+        * context. It cannot be refreshed on its own, as it is the 'reverse'
+        * (server-side) representation of a client context.
+        * We do not care if the reverse context is expired, and want to send
+        * the LDLM callback anyway. Once the client receives the AST, it is
+        * its job to refresh its own context if it has expired, hence
+        * refreshing the associated reverse context on server side, before
+        * being able to send the LDLM_CANCEL requested by the server.
+        */
+       if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK)
+               rc = sptlrpc_req_refresh_ctx(req, 0);
        if (rc) {
                if (req->rq_err) {
                        req->rq_status = rc;
@@ -1698,25 +1725,28 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
        }
 
        CDEBUG(D_RPCTRACE,
-              "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
-              current_comm(),
+              "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+              req, current->comm,
               imp->imp_obd->obd_uuid.uuid,
               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
-              obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg));
+              obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg),
+              lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
 
        rc = ptl_send_rpc(req, 0);
        if (rc == -ENOMEM) {
                spin_lock(&imp->imp_lock);
                if (!list_empty(&req->rq_list)) {
                        list_del_init(&req->rq_list);
-                       atomic_dec(&req->rq_import->imp_inflight);
+                       if (atomic_dec_and_test(&req->rq_import->imp_inflight))
+                               wake_up(&req->rq_import->imp_recovery_waitq);
                }
                spin_unlock(&imp->imp_lock);
                ptlrpc_rqphase_move(req, RQ_PHASE_NEW);
                RETURN(rc);
        }
        if (rc) {
-               DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
+               DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d",
+                         rc);
                spin_lock(&req->rq_lock);
                req->rq_net_err = 1;
                spin_unlock(&req->rq_lock);
@@ -1761,19 +1791,16 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
  */
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp, *next;
-       struct list_head  comp_reqs;
+       struct ptlrpc_request *req, *next;
+       LIST_HEAD(comp_reqs);
        int force_timer_recalc = 0;
 
        ENTRY;
        if (atomic_read(&set->set_remaining) == 0)
                RETURN(1);
 
-       INIT_LIST_HEAD(&comp_reqs);
-       list_for_each_safe(tmp, next, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
+       list_for_each_entry_safe(req, next, &set->set_requests,
+                                rq_set_chain) {
                struct obd_import *imp = req->rq_import;
                int unregistered = 0;
                int async = 1;
@@ -1856,7 +1883,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         * not corrupt any data.
                         */
                        if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
-                           ptlrpc_client_recv_or_unlink(req))
+                           ptlrpc_cli_wait_unlink(req))
                                continue;
                        if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
                            ptlrpc_client_bulk_active(req))
@@ -1894,7 +1921,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        /*
                         * Check if we still need to wait for unlink.
                         */
-                       if (ptlrpc_client_recv_or_unlink(req) ||
+                       if (ptlrpc_cli_wait_unlink(req) ||
                            ptlrpc_client_bulk_active(req))
                                continue;
                        /* If there is no need to resend, fail it now. */
@@ -1919,7 +1946,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                }
 
                /*
-                * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
+                * ptlrpc_set_wait uses l_wait_event_abortable_timeout()
                 * so it sets rq_intr regardless of individual rpc
                 * timeouts. The synchronous IO waiting path sets
                 * rq_intr irrespective of whether ptlrpcd
@@ -1952,9 +1979,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                         * put on delay list - only if we wait
                                         * recovery finished - before send
                                         */
-                                       list_del_init(&req->rq_list);
-                                       list_add_tail(&req->rq_list,
-                                                     &imp->imp_delayed_list);
+                                       list_move_tail(&req->rq_list,
+                                                      &imp->imp_delayed_list);
                                        spin_unlock(&imp->imp_lock);
                                        continue;
                                }
@@ -1978,9 +2004,29 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                        GOTO(interpret, req->rq_status);
                                }
 
-                               list_del_init(&req->rq_list);
-                               list_add_tail(&req->rq_list,
-                                             &imp->imp_sending_list);
+                               /* don't resend too fast in case of network
+                                * errors.
+                                */
+                               if (ktime_get_real_seconds() < (req->rq_sent + 1)
+                                   && req->rq_net_err && req->rq_timedout) {
+
+                                       DEBUG_REQ(D_INFO, req,
+                                                 "throttle request");
+                                       /* Don't try to resend RPC right away
+                                        * as it is likely it will fail again
+                                        * and ptlrpc_check_set() will be
+                                        * called again, keeping this thread
+                                        * busy. Instead, wait for the next
+                                        * timeout. Flag it as resend to
+                                        * ensure we don't wait to long.
+                                        */
+                                       req->rq_resend = 1;
+                                       spin_unlock(&imp->imp_lock);
+                                       continue;
+                               }
+
+                               list_move_tail(&req->rq_list,
+                                              &imp->imp_sending_list);
 
                                spin_unlock(&imp->imp_lock);
 
@@ -2001,7 +2047,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                 * rq_wait_ctx is only touched by ptlrpcd,
                                 * so no lock is needed here.
                                 */
-                               status = sptlrpc_req_refresh_ctx(req, -1);
+                               status = sptlrpc_req_refresh_ctx(req, 0);
                                if (status) {
                                        if (req->rq_err) {
                                                req->rq_status = status;
@@ -2111,7 +2157,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         * was good after getting the REPLY for her GET or
                         * the ACK for her PUT.
                         */
-                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed %d/%d/%d",
+                                 req->rq_status,
+                                 req->rq_bulk->bd_nob,
+                                 req->rq_bulk->bd_nob_transferred);
                        req->rq_status = -EIO;
                }
 
@@ -2149,13 +2198,14 @@ interpret:
 
                if (req->rq_reqmsg)
                        CDEBUG(D_RPCTRACE,
-                              "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
-                              current_comm(),
+                              "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+                              req, current->comm,
                               imp->imp_obd->obd_uuid.uuid,
                               lustre_msg_get_status(req->rq_reqmsg),
                               req->rq_xid,
                               obd_import_nid2str(imp),
-                              lustre_msg_get_opc(req->rq_reqmsg));
+                              lustre_msg_get_opc(req->rq_reqmsg),
+                              lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
 
                spin_lock(&imp->imp_lock);
                /*
@@ -2166,13 +2216,14 @@ interpret:
                 */
                if (!list_empty(&req->rq_list)) {
                        list_del_init(&req->rq_list);
-                       atomic_dec(&imp->imp_inflight);
+                       if (atomic_dec_and_test(&imp->imp_inflight))
+                               wake_up(&imp->imp_recovery_waitq);
                }
                list_del_init(&req->rq_unreplied_list);
                spin_unlock(&imp->imp_lock);
 
                atomic_dec(&set->set_remaining);
-               wake_up_all(&imp->imp_recovery_waitq);
+               wake_up(&imp->imp_recovery_waitq);
 
                if (set->set_producer) {
                        /* produce a new request if possible */
@@ -2234,7 +2285,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
                       req->rq_real_sent < req->rq_sent ||
                       req->rq_real_sent >= req->rq_deadline) ?
                      "timed out for sent delay" : "timed out for slow reply"),
-                 (s64)req->rq_sent, (s64)req->rq_real_sent);
+                 req->rq_sent, req->rq_real_sent);
 
        if (imp && obd_debug_peer_on_timeout)
                LNetDebugPeer(imp->imp_connection->c_peer);
@@ -2289,13 +2340,11 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
 
 /**
  * Time out all uncompleted requests in request set pointed by \a data
- * Callback used when waiting on sets with l_wait_event.
- * Always returns 1.
+ * This is called when a wait times out.
  */
-int ptlrpc_expired_set(void *data)
+void ptlrpc_expired_set(struct ptlrpc_request_set *set)
 {
-       struct ptlrpc_request_set *set = data;
-       struct list_head *tmp;
+       struct ptlrpc_request *req;
        time64_t now = ktime_get_real_seconds();
 
        ENTRY;
@@ -2304,11 +2353,7 @@ int ptlrpc_expired_set(void *data)
        /*
         * A timeout expired. See which reqs it applies to...
         */
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                /* don't expire request waiting for context */
                if (req->rq_wait_ctx)
                        continue;
@@ -2328,43 +2373,28 @@ int ptlrpc_expired_set(void *data)
                 * ptlrpcd thread.
                 */
                ptlrpc_expire_one_request(req, 1);
-       }
-
-       /*
-        * When waiting for a whole set, we always break out of the
-        * sleep so we can recalculate the timeout, or enable interrupts
-        * if everyone's timed out.
-        */
-       RETURN(1);
-}
+               /*
+                * Loops require that we resched once in a while to avoid
+                * RCU stalls and a few other problems.
+                */
+               cond_resched();
 
-/**
- * Sets rq_intr flag in \a req under spinlock.
- */
-void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
-{
-       spin_lock(&req->rq_lock);
-       req->rq_intr = 1;
-       spin_unlock(&req->rq_lock);
+       }
 }
-EXPORT_SYMBOL(ptlrpc_mark_interrupted);
 
 /**
  * Interrupts (sets interrupted flag) all uncompleted requests in
- * a set \a data. Callback for l_wait_event for interruptible waits.
+ * a set \a data. This is called when a wait_event is interrupted
+ * by a signal.
  */
-static void ptlrpc_interrupted_set(void *data)
+static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
 {
-       struct ptlrpc_request_set *set = data;
-       struct list_head *tmp;
+       struct ptlrpc_request *req;
 
        LASSERT(set != NULL);
        CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
 
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                if (req->rq_intr)
                        continue;
 
@@ -2373,7 +2403,9 @@ static void ptlrpc_interrupted_set(void *data)
                    !req->rq_allow_intr)
                        continue;
 
-               ptlrpc_mark_interrupted(req);
+               spin_lock(&req->rq_lock);
+               req->rq_intr = 1;
+               spin_unlock(&req->rq_lock);
        }
 }
 
@@ -2382,16 +2414,13 @@ static void ptlrpc_interrupted_set(void *data)
  */
 time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
        time64_t now = ktime_get_real_seconds();
        int timeout = 0;
        struct ptlrpc_request *req;
        time64_t deadline;
 
        ENTRY;
-       list_for_each(tmp, &set->set_requests) {
-               req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                /* Request in-flight? */
                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
                      (req->rq_phase == RQ_PHASE_BULK) ||
@@ -2429,9 +2458,7 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
  */
 int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
        struct ptlrpc_request *req;
-       struct l_wait_info lwi;
        time64_t timeout;
        int rc;
 
@@ -2439,9 +2466,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
        if (set->set_producer)
                (void)ptlrpc_set_producer(set);
        else
-               list_for_each(tmp, &set->set_requests) {
-                       req = list_entry(tmp, struct ptlrpc_request,
-                                        rq_set_chain);
+               list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                        if (req->rq_phase == RQ_PHASE_NEW)
                                (void)ptlrpc_send_new_req(req);
                }
@@ -2460,49 +2485,67 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                       set, timeout);
 
                if ((timeout == 0 && !signal_pending(current)) ||
-                   set->set_allow_intr)
+                   set->set_allow_intr) {
                        /*
                         * No requests are in-flight (ether timed out
                         * or delayed), so we can allow interrupts.
                         * We still want to block for a limited time,
                         * so we allow interrupts during the timeout.
                         */
-                       lwi = LWI_TIMEOUT_INTR_ALL(
-                                       cfs_time_seconds(timeout ? timeout : 1),
-                                       ptlrpc_expired_set,
-                                       ptlrpc_interrupted_set, set);
-               else
+                       rc = l_wait_event_abortable_timeout(
+                               set->set_waitq,
+                               ptlrpc_check_set(NULL, set),
+                               cfs_time_seconds(timeout ? timeout : 1));
+                       if (rc == 0) {
+                               rc = -ETIMEDOUT;
+                               ptlrpc_expired_set(set);
+                       } else if (rc < 0) {
+                               rc = -EINTR;
+                               ptlrpc_interrupted_set(set);
+                       } else {
+                               rc = 0;
+                       }
+               } else {
                        /*
                         * At least one request is in flight, so no
                         * interrupts are allowed. Wait until all
                         * complete, or an in-flight req times out.
                         */
-                       lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
-                                         ptlrpc_expired_set, set);
-
-               rc = l_wait_event(set->set_waitq,
-                                 ptlrpc_check_set(NULL, set), &lwi);
-
-               /*
-                * LU-769 - if we ignored the signal because it was already
-                * pending when we started, we need to handle it now or we risk
-                * it being ignored forever
-                */
-               if (rc == -ETIMEDOUT &&
-                   (!lwi.lwi_allow_intr || set->set_allow_intr) &&
-                   signal_pending(current)) {
-                       sigset_t blocked_sigs =
-                                          cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+                       rc = wait_event_idle_timeout(
+                               set->set_waitq,
+                               ptlrpc_check_set(NULL, set),
+                               cfs_time_seconds(timeout ? timeout : 1));
+                       if (rc == 0) {
+                               ptlrpc_expired_set(set);
+                               rc = -ETIMEDOUT;
+                       } else {
+                               rc = 0;
+                       }
 
                        /*
-                        * In fact we only interrupt for the "fatal" signals
-                        * like SIGINT or SIGKILL. We still ignore less
-                        * important signals since ptlrpc set is not easily
-                        * reentrant from userspace again
+                        * LU-769 - if we ignored the signal because
+                        * it was already pending when we started, we
+                        * need to handle it now or we risk it being
+                        * ignored forever
                         */
-                       if (signal_pending(current))
-                               ptlrpc_interrupted_set(set);
-                       cfs_restore_sigs(blocked_sigs);
+                       if (rc == -ETIMEDOUT &&
+                           signal_pending(current)) {
+                               sigset_t old, new;
+
+                               siginitset(&new, LUSTRE_FATAL_SIGS);
+                               sigprocmask(SIG_BLOCK, &new, &old);
+                               /*
+                                * In fact we only interrupt for the
+                                * "fatal" signals like SIGINT or
+                                * SIGKILL. We still ignore less
+                                * important signals since ptlrpc set
+                                * is not easily reentrant from
+                                * userspace again
+                                */
+                               if (signal_pending(current))
+                                       ptlrpc_interrupted_set(set);
+                               sigprocmask(SIG_SETMASK, &old, NULL);
+                       }
                }
 
                LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
@@ -2517,9 +2560,8 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                 * the error cases -eeb.
                 */
                if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
-                       list_for_each(tmp, &set->set_requests) {
-                               req = list_entry(tmp, struct ptlrpc_request,
-                                                rq_set_chain);
+                       list_for_each_entry(req, &set->set_requests,
+                                           rq_set_chain) {
                                spin_lock(&req->rq_lock);
                                req->rq_invalid_rqset = 1;
                                spin_unlock(&req->rq_lock);
@@ -2530,9 +2572,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
        LASSERT(atomic_read(&set->set_remaining) == 0);
 
        rc = set->set_rc; /* rq_status of already freed requests if any */
-       list_for_each(tmp, &set->set_requests) {
-               req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
                if (req->rq_status != 0)
                        rc = req->rq_status;
@@ -2590,6 +2630,10 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                sptlrpc_cli_free_repbuf(request);
 
        if (request->rq_import) {
+               if (!ptlrpcd_check_work(request)) {
+                       LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0);
+                       atomic_dec(&request->rq_import->imp_reqs);
+               }
                class_import_put(request->rq_import);
                request->rq_import = NULL;
        }
@@ -2704,9 +2748,7 @@ EXPORT_SYMBOL(ptlrpc_req_xid);
  */
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 {
-       int rc;
-       struct l_wait_info lwi;
-
+       bool discard = false;
        /*
         * Might sleep.
         */
@@ -2716,20 +2758,23 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
            async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
                request->rq_reply_deadline = ktime_get_real_seconds() +
-                                            LONG_UNLINK;
+                                            PTLRPC_REQ_LONG_UNLINK;
 
        /*
         * Nothing left to do.
         */
-       if (!ptlrpc_client_recv_or_unlink(request))
+       if (!__ptlrpc_cli_wait_unlink(request, &discard))
                RETURN(1);
 
        LNetMDUnlink(request->rq_reply_md_h);
 
+       if (discard) /* Discard the request-out callback */
+               __LNetMDUnlink(request->rq_req_md_h, discard);
+
        /*
         * Let's check it once again.
         */
-       if (!ptlrpc_client_recv_or_unlink(request))
+       if (!ptlrpc_cli_wait_unlink(request))
                RETURN(1);
 
        /* Move to "Unregistering" phase as reply was not unlinked yet. */
@@ -2742,29 +2787,30 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
                RETURN(0);
 
        /*
-        * We have to l_wait_event() whatever the result, to give liblustre
+        * We have to wait_event_idle_timeout() whatever the result, to get
         * a chance to run reply_in_callback(), and to make sure we've
         * unlinked before returning a req to the pool.
         */
        for (;;) {
-               /* The wq argument is ignored by user-space wait_event macros */
                wait_queue_head_t *wq = (request->rq_set) ?
                                        &request->rq_set->set_waitq :
                                        &request->rq_reply_waitq;
+               int seconds = PTLRPC_REQ_LONG_UNLINK;
                /*
                 * Network access will complete in finite time but the HUGE
                 * timeout lets us CWARN for visibility of sluggish NALs
                 */
-               lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
-                                          cfs_time_seconds(1), NULL, NULL);
-               rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
-                                 &lwi);
-               if (rc == 0) {
+               while (seconds > 0 &&
+                      wait_event_idle_timeout(
+                              *wq,
+                              !ptlrpc_cli_wait_unlink(request),
+                              cfs_time_seconds(1)) == 0)
+                       seconds -= 1;
+               if (seconds > 0) {
                        ptlrpc_rqphase_move(request, request->rq_next_phase);
                        RETURN(1);
                }
 
-               LASSERT(rc == -ETIMEDOUT);
                DEBUG_REQ(D_WARNING, request,
                          "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
                          request->rq_receiving_reply,
@@ -2970,7 +3016,7 @@ EXPORT_SYMBOL(ptlrpc_request_addref);
 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                      struct obd_import *imp)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *iter;
 
        assert_spin_locked(&imp->imp_lock);
 
@@ -2998,11 +3044,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
        LASSERT(imp->imp_replayable);
        /* Balanced in ptlrpc_free_committed, usually. */
        ptlrpc_request_addref(req);
-       list_for_each_prev(tmp, &imp->imp_replay_list) {
-               struct ptlrpc_request *iter = list_entry(tmp,
-                                                        struct ptlrpc_request,
-                                                        rq_replay_list);
-
+       list_for_each_entry_reverse(iter, &imp->imp_replay_list,
+                                   rq_replay_list) {
                /*
                 * We may have duplicate transnos if we create and then
                 * open a file, or for closes retained if to match creating
@@ -3046,7 +3089,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
        }
 
        /* for distributed debugging */
-       lustre_msg_set_status(req->rq_reqmsg, current_pid());
+       lustre_msg_set_status(req->rq_reqmsg, current->pid);
 
        /* add a ref for the set (see comment in ptlrpc_set_add_req) */
        ptlrpc_request_addref(req);
@@ -3081,7 +3124,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
        if (!ptlrpc_client_replied(req) ||
            (req->rq_bulk &&
             lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) {
-               DEBUG_REQ(D_ERROR, req, "request replay timed out.\n");
+               DEBUG_REQ(D_ERROR, req, "request replay timed out");
                GOTO(out, rc = -ETIMEDOUT);
        }
 
@@ -3093,7 +3136,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
        /** VBR: check version failure */
        if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
                /** replay was failed due to version mismatch */
-               DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+               DEBUG_REQ(D_WARNING, req, "Version mismatch during replay");
                spin_lock(&imp->imp_lock);
                imp->imp_vbr_failed = 1;
                spin_unlock(&imp->imp_lock);
@@ -3116,13 +3159,13 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
        /* transaction number shouldn't be bigger than the latest replayed */
        if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
                DEBUG_REQ(D_ERROR, req,
-                         "Reported transno %llu is bigger than the replayed one: %llu",
+                         "Reported transno=%llu is bigger than replayed=%llu",
                          req->rq_transno,
                          lustre_msg_get_transno(req->rq_reqmsg));
                GOTO(out, rc = -EINVAL);
        }
 
-       DEBUG_REQ(D_HA, req, "got rep");
+       DEBUG_REQ(D_HA, req, "got reply");
 
        /* let the callback do fixups, possibly including in the request */
        if (req->rq_replay_cb)
@@ -3215,8 +3258,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 
        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
 
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
+       aa = ptlrpc_req_async_args(aa, req);
        memset(aa, 0, sizeof(*aa));
 
        /* Prepare request to be resent with ptlrpcd */
@@ -3232,8 +3274,8 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
        ptlrpc_at_set_req_timeout(req);
 
        /* Tell server net_latency to calculate how long to wait for reply. */
-       lustre_msg_set_service_time(req->rq_reqmsg,
-                                   ptlrpc_at_get_net_latency(req));
+       lustre_msg_set_service_timeout(req->rq_reqmsg,
+                                      ptlrpc_at_get_net_latency(req));
        DEBUG_REQ(D_HA, req, "REPLAY");
 
        atomic_inc(&req->rq_import->imp_replay_inflight);
@@ -3251,26 +3293,22 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
  */
 void ptlrpc_abort_inflight(struct obd_import *imp)
 {
-       struct list_head *tmp, *n;
-
+       struct ptlrpc_request *req;
        ENTRY;
+
        /*
         * Make sure that no new requests get processed for this import.
         * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
         * this flag and then putting requests on sending_list or delayed_list.
         */
-       spin_lock(&imp->imp_lock);
+       assert_spin_locked(&imp->imp_lock);
 
        /*
         * XXX locking?  Maybe we should remove each request with the list
         * locked?  Also, how do we know if the requests on the list are
         * being freed at this time?
         */
-       list_for_each_safe(tmp, n, &imp->imp_sending_list) {
-               struct ptlrpc_request *req = list_entry(tmp,
-                                                       struct ptlrpc_request,
-                                                       rq_list);
-
+       list_for_each_entry(req, &imp->imp_sending_list, rq_list) {
                DEBUG_REQ(D_RPCTRACE, req, "inflight");
 
                spin_lock(&req->rq_lock);
@@ -3282,10 +3320,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                spin_unlock(&req->rq_lock);
        }
 
-       list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request, rq_list);
-
+       list_for_each_entry(req, &imp->imp_delayed_list, rq_list) {
                DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
 
                spin_lock(&req->rq_lock);
@@ -3304,8 +3339,6 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
        if (imp->imp_replayable)
                ptlrpc_free_committed(imp);
 
-       spin_unlock(&imp->imp_lock);
-
        EXIT;
 }
 
@@ -3314,15 +3347,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
  */
 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp, *pos;
+       struct ptlrpc_request *req;
 
        LASSERT(set != NULL);
 
-       list_for_each_safe(pos, tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(pos, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                spin_lock(&req->rq_lock);
                if (req->rq_phase != RQ_PHASE_RPC) {
                        spin_unlock(&req->rq_lock);
@@ -3355,19 +3384,21 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 void ptlrpc_init_xid(void)
 {
        time64_t now = ktime_get_real_seconds();
+       u64 xid;
 
-       spin_lock_init(&ptlrpc_last_xid_lock);
        if (now < YEAR_2004) {
-               get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
-               ptlrpc_last_xid >>= 2;
-               ptlrpc_last_xid |= (1ULL << 61);
+               get_random_bytes(&xid, sizeof(xid));
+               xid >>= 2;
+               xid |= (1ULL << 61);
        } else {
-               ptlrpc_last_xid = (__u64)now << 20;
+               xid = (u64)now << 20;
        }
 
        /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
-       CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
-       ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
+       BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+                    0);
+       xid &= PTLRPC_BULK_OPS_MASK;
+       atomic64_set(&ptlrpc_last_xid, xid);
 }
 
 /**
@@ -3384,14 +3415,7 @@ void ptlrpc_init_xid(void)
  */
 __u64 ptlrpc_next_xid(void)
 {
-       __u64 next;
-
-       spin_lock(&ptlrpc_last_xid_lock);
-       next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-       ptlrpc_last_xid = next;
-       spin_unlock(&ptlrpc_last_xid_lock);
-
-       return next;
+       return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid);
 }
 
 /**
@@ -3400,12 +3424,11 @@ __u64 ptlrpc_next_xid(void)
  * request to ensure previous bulk fails and avoid problems with lost replies
  * and therefore several transfers landing into the same buffer from different
  * sending attempts.
+ * Also, to avoid previous reply landing to a different sending attempt.
  */
-void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+void ptlrpc_set_mbits(struct ptlrpc_request *req)
 {
-       struct ptlrpc_bulk_desc *bd = req->rq_bulk;
-
-       LASSERT(bd != NULL);
+       int md_count = req->rq_bulk ? req->rq_bulk->bd_md_count : 1;
 
        /*
         * Generate new matchbits for all resend requests, including
@@ -3421,7 +3444,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                 * 'resend for the -EINPROGRESS resend'. To make it simple,
                 * we opt to generate mbits for all resend cases.
                 */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data,
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
                                 BULK_MBITS)) {
                        req->rq_mbits = ptlrpc_next_xid();
                } else {
@@ -3435,17 +3458,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                        spin_unlock(&req->rq_import->imp_lock);
                        req->rq_mbits = req->rq_xid;
                }
-               CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
+               CDEBUG(D_HA, "resend with new mbits old x%llu new x%llu\n",
                       old_mbits, req->rq_mbits);
        } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
                /* Request being sent first time, use xid as matchbits. */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)
-                   || req->rq_mbits == 0) {
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
+                                BULK_MBITS) || req->rq_mbits == 0)
+               {
                        req->rq_mbits = req->rq_xid;
                } else {
-                       int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                                       LNET_MAX_IOV;
-                       req->rq_mbits -= total_md - 1;
+                       req->rq_mbits -= md_count - 1;
                }
        } else {
                /*
@@ -3460,8 +3482,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * that server can infer the number of bulks that were prepared,
         * see LU-1431
         */
-       req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                         LNET_MAX_IOV) - 1;
+       req->rq_mbits += md_count - 1;
 
        /*
         * Set rq_xid as rq_mbits to indicate the final bulk for the old
@@ -3470,7 +3491,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * It's ok to directly set the rq_xid here, since this xid bump
         * won't affect the request position in unreplied list.
         */
-       if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
+       if (!OCD_HAS_FLAG(&req->rq_import->imp_connect_data, BULK_MBITS))
                req->rq_xid = req->rq_mbits;
 }
 
@@ -3480,19 +3501,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
  */
 __u64 ptlrpc_sample_next_xid(void)
 {
-#if BITS_PER_LONG == 32
-       /* need to avoid possible word tearing on 32-bit systems */
-       __u64 next;
-
-       spin_lock(&ptlrpc_last_xid_lock);
-       next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-       spin_unlock(&ptlrpc_last_xid_lock);
-
-       return next;
-#else
-       /* No need to lock, since returned value is racy anyways */
-       return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-#endif
+       return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT;
 }
 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
 
@@ -3591,8 +3600,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp,
        req->rq_no_delay = req->rq_no_resend = 1;
        req->rq_pill.rc_fmt = (void *)&worker_format;
 
-       CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
-       args = ptlrpc_req_async_args(req);
+       args = ptlrpc_req_async_args(args, req);
        args->cb     = cb;
        args->cbdata = cbdata;