Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index bc983ac..85bb30d 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/delay.h>
 #include <linux/random.h>
 
+#include <lnet/lib-lnet.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_lib.h>
 
 #include "ptlrpc_internal.h"
 
+static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
+                                     struct page *page, int pageoffset,
+                                     int len)
+{
+       __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1);
+}
+
+static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
+                                       struct page *page, int pageoffset,
+                                       int len)
+{
+       __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
+}
+
+static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
+{
+       int i;
+
+       for (i = 0; i < desc->bd_iov_count ; i++)
+               put_page(BD_GET_KIOV(desc, i).kiov_page);
+}
+
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+                                      void *frag, int len)
+{
+       unsigned int offset = (uintptr_t)frag & ~PAGE_MASK;
+
+       ENTRY;
+       while (len > 0) {
+               int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+                                    len);
+               uintptr_t vaddr = (uintptr_t) frag;
+
+               ptlrpc_prep_bulk_page_nopin(desc,
+                                           lnet_kvaddr_to_page(vaddr),
+                                           offset, page_len);
+               offset = 0;
+               len -= page_len;
+               frag += page_len;
+       }
+
+       RETURN(desc->bd_nob);
+}
+
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_pin,
        .release_frags  = ptlrpc_release_bulk_page_pin,
@@ -55,6 +100,7 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_nopin,
        .release_frags  = ptlrpc_release_bulk_noop,
+       .add_iov_frag   = ptlrpc_prep_bulk_frag_pages,
 };
 EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
 
@@ -70,7 +116,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
 /**
  * Initialize passed in client structure \a cl.
  */
-void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
+void ptlrpc_init_client(int req_portal, int rep_portal, const char *name,
                        struct ptlrpc_client *cl)
 {
        cl->cli_request_portal = req_portal;
@@ -154,7 +200,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
-       desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *)ops;
+       desc->bd_frag_ops = ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
        /*
@@ -195,7 +241,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
        if (!desc)
                RETURN(NULL);
 
-       desc->bd_import_generation = req->rq_import_generation;
        desc->bd_import = class_import_get(imp);
        desc->bd_req = req;
 
@@ -415,14 +460,16 @@ static int unpack_reply(struct ptlrpc_request *req)
        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
                rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
                if (rc) {
-                       DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
+                       DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d",
+                                 rc);
                        return -EPROTO;
                }
        }
 
        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
        if (rc) {
-               DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
+               DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d",
+                         rc);
                return -EPROTO;
        }
        return 0;
@@ -483,6 +530,8 @@ __must_hold(&req->rq_lock)
        req->rq_deadline = req->rq_sent + req->rq_timeout +
                           ptlrpc_at_get_net_latency(req);
 
+       /* The below message is checked in replay-single.sh test_65{a,b} */
+       /* The below message is checked in sanity-{gss,krb5} test_8 */
        DEBUG_REQ(D_ADAPTTO, req,
                  "Early reply #%d, new deadline in %llds (%llds)",
                  req->rq_early_count,
@@ -561,13 +610,11 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
                 "Trying to change pool size with nonempty pool from %d to %d bytes\n",
                 pool->prp_rq_size, size);
 
-       spin_lock(&pool->prp_lock);
        pool->prp_rq_size = size;
        for (i = 0; i < num_rq; i++) {
                struct ptlrpc_request *req;
                struct lustre_msg *msg;
 
-               spin_unlock(&pool->prp_lock);
                req = ptlrpc_request_cache_alloc(GFP_NOFS);
                if (!req)
                        return i;
@@ -581,8 +628,8 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
                req->rq_pool = pool;
                spin_lock(&pool->prp_lock);
                list_add_tail(&req->rq_list, &pool->prp_req_list);
+               spin_unlock(&pool->prp_lock);
        }
-       spin_unlock(&pool->prp_lock);
        return num_rq;
 }
 EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
@@ -601,7 +648,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize,
 {
        struct ptlrpc_request_pool *pool;
 
-       OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+       OBD_ALLOC_PTR(pool);
        if (!pool)
                return NULL;
 
@@ -712,8 +759,42 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
        spin_unlock(&req->rq_import->imp_lock);
 }
 
-static __u64 ptlrpc_last_xid;
-static spinlock_t ptlrpc_last_xid_lock;
+static atomic64_t ptlrpc_last_xid;
+
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       list_del_init(&req->rq_unreplied_list);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+       DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = obd_get_mod_rpc_slot(cli, opc);
+       lustre_msg_set_tag(req->rq_reqmsg, tag);
+       ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (tag != 0) {
+               struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+               __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+               obd_put_mod_rpc_slot(cli, opc, tag);
+       }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
 
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                             __u32 version, int opcode, char **bufs,
@@ -782,9 +863,9 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                        fail2_t = &request->rq_bulk_deadline;
                } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) {
                        time64_t now = ktime_get_real_seconds();
-                       spin_lock(&ptlrpc_last_xid_lock);
-                       ptlrpc_last_xid = ((__u64)now >> 4) << 24;
-                       spin_unlock(&ptlrpc_last_xid_lock);
+                       u64 xid = ((u64)now >> 4) << 24;
+
+                       atomic64_set(&ptlrpc_last_xid, xid);
                }
 
                if (fail_t) {
@@ -822,32 +903,7 @@ EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
 int ptlrpc_request_pack(struct ptlrpc_request *request,
                        __u32 version, int opcode)
 {
-       int rc;
-
-       rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
-       if (rc)
-               return rc;
-
-       /*
-        * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
-        * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
-        * have to send old ptlrpc_body to keep interoprability with these
-        * clients.
-        *
-        * Only three kinds of server->client RPCs so far:
-        *  - LDLM_BL_CALLBACK
-        *  - LDLM_CP_CALLBACK
-        *  - LDLM_GL_CALLBACK
-        *
-        * XXX This should be removed whenever we drop the interoprability with
-        *     the these old clients.
-        */
-       if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
-           opcode == LDLM_GL_CALLBACK)
-               req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
-                                  sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
-
-       return rc;
+       return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
 }
 EXPORT_SYMBOL(ptlrpc_request_pack);
 
@@ -897,7 +953,6 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
                              const struct req_format *format)
 {
        struct ptlrpc_request *request;
-       int connect = 0;
 
        request = __ptlrpc_request_alloc(imp, pool);
        if (!request)
@@ -918,17 +973,17 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
                if (imp->imp_state == LUSTRE_IMP_IDLE) {
                        imp->imp_generation++;
                        imp->imp_initiated_at = imp->imp_generation;
-                       imp->imp_state =  LUSTRE_IMP_NEW;
-                       connect = 1;
-               }
-               spin_unlock(&imp->imp_lock);
-               if (connect) {
-                       rc = ptlrpc_connect_import(imp);
+                       imp->imp_state = LUSTRE_IMP_NEW;
+
+                       /* connect_import_locked releases imp_lock */
+                       rc = ptlrpc_connect_import_locked(imp);
                        if (rc < 0) {
                                ptlrpc_request_free(request);
                                return NULL;
                        }
                        ptlrpc_pinger_add_import(imp);
+               } else {
+                       spin_unlock(&imp->imp_lock);
                }
        }
 
@@ -1206,7 +1261,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
        if (req->rq_ctx_init || req->rq_ctx_fini) {
                /* always allow ctx init/fini rpc go through */
        } else if (imp->imp_state == LUSTRE_IMP_NEW) {
-               DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
+               DEBUG_REQ(D_ERROR, req, "Uninitialized import");
                *status = -EIO;
        } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
                unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
@@ -1216,11 +1271,11 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                 * race with umount
                 */
                DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
-                         D_HA : D_ERROR, req, "IMP_CLOSED ");
+                         D_HA : D_ERROR, req, "IMP_CLOSED");
                *status = -EIO;
        } else if (ptlrpc_send_limit_expired(req)) {
                /* probably doesn't need to be a D_ERROR afterinitial testing */
-               DEBUG_REQ(D_HA, req, "send limit expired ");
+               DEBUG_REQ(D_HA, req, "send limit expired");
                *status = -ETIMEDOUT;
        } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
                   imp->imp_state == LUSTRE_IMP_CONNECTING) {
@@ -1250,7 +1305,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                            imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
                            imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
                            imp->imp_state == LUSTRE_IMP_RECOVER)) {
-                       DEBUG_REQ(D_HA, req, "allow during recovery.\n");
+                       DEBUG_REQ(D_HA, req, "allow during recovery");
                } else {
                        delay = 1;
                }
@@ -1307,32 +1362,28 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err)
  */
 static int ptlrpc_check_status(struct ptlrpc_request *req)
 {
-       int err;
+       int rc;
 
        ENTRY;
-       err = lustre_msg_get_status(req->rq_repmsg);
+       rc = lustre_msg_get_status(req->rq_repmsg);
        if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
                struct obd_import *imp = req->rq_import;
                lnet_nid_t nid = imp->imp_connection->c_peer.nid;
                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
 
-               if (ptlrpc_console_allow(req, opc, err))
+               if (ptlrpc_console_allow(req, opc, rc))
                        LCONSOLE_ERROR_MSG(0x11,
                                           "%s: operation %s to node %s failed: rc = %d\n",
                                           imp->imp_obd->obd_name,
                                           ll_opcode2str(opc),
-                                          libcfs_nid2str(nid), err);
-               RETURN(err < 0 ? err : -EINVAL);
+                                          libcfs_nid2str(nid), rc);
+               RETURN(rc < 0 ? rc : -EINVAL);
        }
 
-       if (err < 0) {
-               DEBUG_REQ(D_INFO, req, "status is %d", err);
-       } else if (err > 0) {
-               /* XXX: translate this error from net to host */
-               DEBUG_REQ(D_INFO, req, "status is %d", err);
-       }
+       if (rc)
+               DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc);
 
-       RETURN(err);
+       RETURN(rc);
 }
 
 /**
@@ -1400,7 +1451,7 @@ static int after_reply(struct ptlrpc_request *req)
        if (req->rq_reply_truncated) {
                if (ptlrpc_no_resend(req)) {
                        DEBUG_REQ(D_ERROR, req,
-                                 "reply buffer overflow, expected: %d, actual size: %d",
+                                 "reply buffer overflow, expected=%d, actual size=%d",
                                  req->rq_nob_received, req->rq_repbuf_len);
                        RETURN(-EOVERFLOW);
                }
@@ -1429,7 +1480,7 @@ static int after_reply(struct ptlrpc_request *req)
         */
        rc = sptlrpc_cli_unwrap_reply(req);
        if (rc) {
-               DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
+               DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc);
                RETURN(rc);
        }
 
@@ -1448,7 +1499,8 @@ static int after_reply(struct ptlrpc_request *req)
            ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
                time64_t now = ktime_get_real_seconds();
 
-               DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
+               DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) |
+                         D_RPCTRACE, req, "resending request on EINPROGRESS");
                spin_lock(&req->rq_lock);
                req->rq_resend = 1;
                spin_unlock(&req->rq_lock);
@@ -1676,11 +1728,12 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
        }
 
        CDEBUG(D_RPCTRACE,
-              "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
-              current_comm(),
+              "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+              req, current_comm(),
               imp->imp_obd->obd_uuid.uuid,
               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
-              obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg));
+              obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg),
+              lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
 
        rc = ptl_send_rpc(req, 0);
        if (rc == -ENOMEM) {
@@ -1694,7 +1747,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
                RETURN(rc);
        }
        if (rc) {
-               DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
+               DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d",
+                         rc);
                spin_lock(&req->rq_lock);
                req->rq_net_err = 1;
                spin_unlock(&req->rq_lock);
@@ -1930,9 +1984,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                         * put on delay list - only if we wait
                                         * recovery finished - before send
                                         */
-                                       list_del_init(&req->rq_list);
-                                       list_add_tail(&req->rq_list,
-                                                     &imp->imp_delayed_list);
+                                       list_move_tail(&req->rq_list,
+                                                      &imp->imp_delayed_list);
                                        spin_unlock(&imp->imp_lock);
                                        continue;
                                }
@@ -1956,9 +2009,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                        GOTO(interpret, req->rq_status);
                                }
 
-                               list_del_init(&req->rq_list);
-                               list_add_tail(&req->rq_list,
-                                             &imp->imp_sending_list);
+                               list_move_tail(&req->rq_list,
+                                              &imp->imp_sending_list);
 
                                spin_unlock(&imp->imp_lock);
 
@@ -2127,13 +2179,14 @@ interpret:
 
                if (req->rq_reqmsg)
                        CDEBUG(D_RPCTRACE,
-                              "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
-                              current_comm(),
+                              "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
+                              req, current_comm(),
                               imp->imp_obd->obd_uuid.uuid,
                               lustre_msg_get_status(req->rq_reqmsg),
                               req->rq_xid,
                               obd_import_nid2str(imp),
-                              lustre_msg_get_opc(req->rq_reqmsg));
+                              lustre_msg_get_opc(req->rq_reqmsg),
+                              lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
 
                spin_lock(&imp->imp_lock);
                /*
@@ -3059,7 +3112,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
        if (!ptlrpc_client_replied(req) ||
            (req->rq_bulk &&
             lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) {
-               DEBUG_REQ(D_ERROR, req, "request replay timed out.\n");
+               DEBUG_REQ(D_ERROR, req, "request replay timed out");
                GOTO(out, rc = -ETIMEDOUT);
        }
 
@@ -3071,7 +3124,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
        /** VBR: check version failure */
        if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
                /** replay was failed due to version mismatch */
-               DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+               DEBUG_REQ(D_WARNING, req, "Version mismatch during replay");
                spin_lock(&imp->imp_lock);
                imp->imp_vbr_failed = 1;
                spin_unlock(&imp->imp_lock);
@@ -3094,13 +3147,13 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
        /* transaction number shouldn't be bigger than the latest replayed */
        if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
                DEBUG_REQ(D_ERROR, req,
-                         "Reported transno %llu is bigger than the replayed one: %llu",
+                         "Reported transno=%llu is bigger than replayed=%llu",
                          req->rq_transno,
                          lustre_msg_get_transno(req->rq_reqmsg));
                GOTO(out, rc = -EINVAL);
        }
 
-       DEBUG_REQ(D_HA, req, "got rep");
+       DEBUG_REQ(D_HA, req, "got reply");
 
        /* let the callback do fixups, possibly including in the request */
        if (req->rq_replay_cb)
@@ -3193,8 +3246,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 
        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
 
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
+       aa = ptlrpc_req_async_args(aa, req);
        memset(aa, 0, sizeof(*aa));
 
        /* Prepare request to be resent with ptlrpcd */
@@ -3230,14 +3282,14 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 void ptlrpc_abort_inflight(struct obd_import *imp)
 {
        struct list_head *tmp, *n;
-
        ENTRY;
+
        /*
         * Make sure that no new requests get processed for this import.
         * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
         * this flag and then putting requests on sending_list or delayed_list.
         */
-       spin_lock(&imp->imp_lock);
+       assert_spin_locked(&imp->imp_lock);
 
        /*
         * XXX locking?  Maybe we should remove each request with the list
@@ -3282,8 +3334,6 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
        if (imp->imp_replayable)
                ptlrpc_free_committed(imp);
 
-       spin_unlock(&imp->imp_lock);
-
        EXIT;
 }
 
@@ -3333,19 +3383,21 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 void ptlrpc_init_xid(void)
 {
        time64_t now = ktime_get_real_seconds();
+       u64 xid;
 
-       spin_lock_init(&ptlrpc_last_xid_lock);
        if (now < YEAR_2004) {
-               get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
-               ptlrpc_last_xid >>= 2;
-               ptlrpc_last_xid |= (1ULL << 61);
+               get_random_bytes(&xid, sizeof(xid));
+               xid >>= 2;
+               xid |= (1ULL << 61);
        } else {
-               ptlrpc_last_xid = (__u64)now << 20;
+               xid = (u64)now << 20;
        }
 
        /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
-       CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
-       ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
+       BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+                    0);
+       xid &= PTLRPC_BULK_OPS_MASK;
+       atomic64_set(&ptlrpc_last_xid, xid);
 }
 
 /**
@@ -3362,14 +3414,7 @@ void ptlrpc_init_xid(void)
  */
 __u64 ptlrpc_next_xid(void)
 {
-       __u64 next;
-
-       spin_lock(&ptlrpc_last_xid_lock);
-       next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-       ptlrpc_last_xid = next;
-       spin_unlock(&ptlrpc_last_xid_lock);
-
-       return next;
+       return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid);
 }
 
 /**
@@ -3458,19 +3503,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
  */
 __u64 ptlrpc_sample_next_xid(void)
 {
-#if BITS_PER_LONG == 32
-       /* need to avoid possible word tearing on 32-bit systems */
-       __u64 next;
-
-       spin_lock(&ptlrpc_last_xid_lock);
-       next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-       spin_unlock(&ptlrpc_last_xid_lock);
-
-       return next;
-#else
-       /* No need to lock, since returned value is racy anyways */
-       return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
-#endif
+       return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT;
 }
 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
 
@@ -3569,8 +3602,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp,
        req->rq_no_delay = req->rq_no_resend = 1;
        req->rq_pill.rc_fmt = (void *)&worker_format;
 
-       CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
-       args = ptlrpc_req_async_args(req);
+       args = ptlrpc_req_async_args(args, req);
        args->cb     = cb;
        args->cbdata = cbdata;