Whamcloud - gitweb
LU-12567 ptlrpc: handle reply and resend reorder
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 6e16826..04fe734 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 /** Implementation of client-side PortalRPC interfaces */
@@ -169,6 +168,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
 
        LASSERT(ops->add_kiov_frag != NULL);
 
+       if (max_brw > PTLRPC_BULK_OPS_COUNT)
+               RETURN(NULL);
+
+       if (nfrags > LNET_MAX_IOV * max_brw)
+               RETURN(NULL);
+
        OBD_ALLOC_PTR(desc);
        if (!desc)
                return NULL;
@@ -185,6 +190,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
+       desc->bd_nob_last = LNET_MTU;
        desc->bd_frag_ops = ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
@@ -253,6 +259,15 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 
        kiov = &desc->bd_vec[desc->bd_iov_count];
 
+       if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+            ((desc->bd_nob_last + len) > LNET_MTU)) {
+               desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+               desc->bd_md_count++;
+               desc->bd_nob_last = 0;
+               LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+       }
+
+       desc->bd_nob_last += len;
        desc->bd_nob += len;
 
        if (pin)
@@ -272,7 +287,7 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 
        LASSERT(desc != NULL);
        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
-       LASSERT(desc->bd_md_count == 0);         /* network hands off */
+       LASSERT(desc->bd_refs == 0);         /* network hands off */
        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
        LASSERT(desc->bd_frag_ops != NULL);
 
@@ -390,10 +405,10 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
                 * resent time, but server sent back service time of original
                 * RPC.
                 */
-               CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
-                      D_ADAPTTO : D_WARNING,
-                      "Reported service time %u > total measured time %lld\n",
-                      service_timeout, now - req->rq_sent);
+               CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) &
+                             MSG_RESENT) ?  D_ADAPTTO : D_WARNING,
+                            "Reported service time %u > total measured time %lld\n",
+                            service_timeout, now - req->rq_sent);
                return;
        }
 
@@ -535,14 +550,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req)
  */
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
 {
-       struct list_head *l, *tmp;
        struct ptlrpc_request *req;
 
        LASSERT(pool != NULL);
 
        spin_lock(&pool->prp_lock);
-       list_for_each_safe(l, tmp, &pool->prp_req_list) {
-               req = list_entry(l, struct ptlrpc_request, rq_list);
+       while ((req = list_first_entry_or_null(&pool->prp_req_list,
+                                              struct ptlrpc_request,
+                                              rq_list))) {
                list_del(&req->rq_list);
                LASSERT(req->rq_reqbuf);
                LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
@@ -652,8 +667,8 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
                return NULL;
        }
 
-       request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
-                            rq_list);
+       request = list_first_entry(&pool->prp_req_list, struct ptlrpc_request,
+                                  rq_list);
        list_del_init(&request->rq_list);
        spin_unlock(&pool->prp_lock);
 
@@ -686,17 +701,14 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
 void ptlrpc_add_unreplied(struct ptlrpc_request *req)
 {
        struct obd_import *imp = req->rq_import;
-       struct list_head *tmp;
        struct ptlrpc_request *iter;
 
        assert_spin_locked(&imp->imp_lock);
        LASSERT(list_empty(&req->rq_unreplied_list));
 
        /* unreplied list is sorted by xid in ascending order */
-       list_for_each_prev(tmp, &imp->imp_unreplied_list) {
-               iter = list_entry(tmp, struct ptlrpc_request,
-                                 rq_unreplied_list);
-
+       list_for_each_entry_reverse(iter, &imp->imp_unreplied_list,
+                                   rq_unreplied_list) {
                LASSERT(req->rq_xid != iter->rq_xid);
                if (req->rq_xid < iter->rq_xid)
                        continue;
@@ -829,11 +841,12 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                }
 
                if (fail_t) {
-                       *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+                       *fail_t = ktime_get_real_seconds() +
+                                 PTLRPC_REQ_LONG_UNLINK;
 
                        if (fail2_t)
                                *fail2_t = ktime_get_real_seconds() +
-                                          LONG_UNLINK;
+                                          PTLRPC_REQ_LONG_UNLINK;
 
                        /*
                         * The RPC is infected, let the test to change the
@@ -888,7 +901,7 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
        if (request) {
                ptlrpc_cli_req_init(request);
 
-               LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+               LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
                LASSERT(imp != LP_POISON);
                LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
                         imp->imp_client);
@@ -1089,8 +1102,7 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
  */
 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
-       struct list_head *next;
+       struct ptlrpc_request *req;
        int expected_phase;
        int n = 0;
 
@@ -1099,11 +1111,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
        /* Requests on the set should either all be completed, or all be new */
        expected_phase = (atomic_read(&set->set_remaining) == 0) ?
                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                LASSERT(req->rq_phase == expected_phase);
                n++;
        }
@@ -1112,10 +1120,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 atomic_read(&set->set_remaining) == n, "%d / %d\n",
                 atomic_read(&set->set_remaining), n);
 
-       list_for_each_safe(tmp, next, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
+       while ((req = list_first_entry_or_null(&set->set_requests,
+                                              struct ptlrpc_request,
+                                              rq_set_chain))) {
                list_del_init(&req->rq_set_chain);
 
                LASSERT(req->rq_phase == expected_phase);
@@ -1274,7 +1281,7 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                } else if (req->rq_no_delay &&
                           imp->imp_generation != imp->imp_initiated_at) {
                        /* ignore nodelay for requests initiating connections */
-                       *status = -EWOULDBLOCK;
+                       *status = -EAGAIN;
                } else if (req->rq_allow_replay &&
                           (imp->imp_state == LUSTRE_IMP_REPLAY ||
                            imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
@@ -1392,8 +1399,8 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
        if (list_empty(&imp->imp_unreplied_list))
                return 0;
 
-       req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
-                        rq_unreplied_list);
+       req = list_first_entry(&imp->imp_unreplied_list, struct ptlrpc_request,
+                              rq_unreplied_list);
        LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
 
        if (imp->imp_known_replied_xid < req->rq_xid - 1)
@@ -1689,7 +1696,22 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        lustre_msg_set_status(req->rq_reqmsg, current->pid);
 
-       rc = sptlrpc_req_refresh_ctx(req, 0);
+       /* If the request to be sent is an LDLM callback, do not try to
+        * refresh context.
+        * An LDLM callback is sent by a server to a client in order to make
+        * it release a lock, on a communication channel that uses a reverse
+        * context. It cannot be refreshed on its own, as it is the 'reverse'
+        * (server-side) representation of a client context.
+        * We do not care if the reverse context is expired, and want to send
+        * the LDLM callback anyway. Once the client receives the AST, it is
+        * its job to refresh its own context if it has expired, hence
+        * refreshing the associated reverse context on server side, before
+        * being able to send the LDLM_CANCEL requested by the server.
+        */
+       if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK)
+               rc = sptlrpc_req_refresh_ctx(req, 0);
        if (rc) {
                if (req->rq_err) {
                        req->rq_status = rc;
@@ -1769,7 +1791,7 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
  */
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp, *next;
+       struct ptlrpc_request *req, *next;
        LIST_HEAD(comp_reqs);
        int force_timer_recalc = 0;
 
@@ -1777,10 +1799,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
        if (atomic_read(&set->set_remaining) == 0)
                RETURN(1);
 
-       list_for_each_safe(tmp, next, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
+       list_for_each_entry_safe(req, next, &set->set_requests,
+                                rq_set_chain) {
                struct obd_import *imp = req->rq_import;
                int unregistered = 0;
                int async = 1;
@@ -1863,7 +1883,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         * not corrupt any data.
                         */
                        if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
-                           ptlrpc_client_recv_or_unlink(req))
+                           ptlrpc_cli_wait_unlink(req))
                                continue;
                        if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
                            ptlrpc_client_bulk_active(req))
@@ -1901,7 +1921,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        /*
                         * Check if we still need to wait for unlink.
                         */
-                       if (ptlrpc_client_recv_or_unlink(req) ||
+                       if (ptlrpc_cli_wait_unlink(req) ||
                            ptlrpc_client_bulk_active(req))
                                continue;
                        /* If there is no need to resend, fail it now. */
@@ -1984,6 +2004,27 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                        GOTO(interpret, req->rq_status);
                                }
 
+                               /* don't resend too fast in case of network
+                                * errors.
+                                */
+                               if (ktime_get_real_seconds() < (req->rq_sent + 1)
+                                   && req->rq_net_err && req->rq_timedout) {
+
+                                       DEBUG_REQ(D_INFO, req,
+                                                 "throttle request");
+                                       /* Don't try to resend RPC right away
+                                        * as it is likely it will fail again
+                                        * and ptlrpc_check_set() will be
+                                        * called again, keeping this thread
+                                        * busy. Instead, wait for the next
+                                        * timeout. Flag it as resend to
+                                        * ensure we don't wait to long.
+                                        */
+                                       req->rq_resend = 1;
+                                       spin_unlock(&imp->imp_lock);
+                                       continue;
+                               }
+
                                list_move_tail(&req->rq_list,
                                               &imp->imp_sending_list);
 
@@ -2116,7 +2157,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         * was good after getting the REPLY for her GET or
                         * the ACK for her PUT.
                         */
-                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed %d/%d/%d",
+                                 req->rq_status,
+                                 req->rq_bulk->bd_nob,
+                                 req->rq_bulk->bd_nob_transferred);
                        req->rq_status = -EIO;
                }
 
@@ -2300,7 +2344,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
  */
 void ptlrpc_expired_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *req;
        time64_t now = ktime_get_real_seconds();
 
        ENTRY;
@@ -2309,11 +2353,7 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set)
        /*
         * A timeout expired. See which reqs it applies to...
         */
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                /* don't expire request waiting for context */
                if (req->rq_wait_ctx)
                        continue;
@@ -2333,6 +2373,12 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set)
                 * ptlrpcd thread.
                 */
                ptlrpc_expire_one_request(req, 1);
+               /*
+                * Loops require that we resched once in a while to avoid
+                * RCU stalls and a few other problems.
+                */
+               cond_resched();
+
        }
 }
 
@@ -2343,15 +2389,12 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set)
  */
 static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *req;
 
        LASSERT(set != NULL);
        CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
 
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                if (req->rq_intr)
                        continue;
 
@@ -2371,16 +2414,13 @@ static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
  */
 time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
        time64_t now = ktime_get_real_seconds();
        int timeout = 0;
        struct ptlrpc_request *req;
        time64_t deadline;
 
        ENTRY;
-       list_for_each(tmp, &set->set_requests) {
-               req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                /* Request in-flight? */
                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
                      (req->rq_phase == RQ_PHASE_BULK) ||
@@ -2418,7 +2458,6 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
  */
 int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
        struct ptlrpc_request *req;
        time64_t timeout;
        int rc;
@@ -2427,9 +2466,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
        if (set->set_producer)
                (void)ptlrpc_set_producer(set);
        else
-               list_for_each(tmp, &set->set_requests) {
-                       req = list_entry(tmp, struct ptlrpc_request,
-                                        rq_set_chain);
+               list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                        if (req->rq_phase == RQ_PHASE_NEW)
                                (void)ptlrpc_send_new_req(req);
                }
@@ -2493,10 +2530,10 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                         */
                        if (rc == -ETIMEDOUT &&
                            signal_pending(current)) {
-                               sigset_t blocked_sigs;
+                               sigset_t old, new;
 
-                               cfs_block_sigsinv(LUSTRE_FATAL_SIGS,
-                                                 &blocked_sigs);
+                               siginitset(&new, LUSTRE_FATAL_SIGS);
+                               sigprocmask(SIG_BLOCK, &new, &old);
                                /*
                                 * In fact we only interrupt for the
                                 * "fatal" signals like SIGINT or
@@ -2507,7 +2544,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                                 */
                                if (signal_pending(current))
                                        ptlrpc_interrupted_set(set);
-                               cfs_restore_sigs(&blocked_sigs);
+                               sigprocmask(SIG_SETMASK, &old, NULL);
                        }
                }
 
@@ -2523,9 +2560,8 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                 * the error cases -eeb.
                 */
                if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
-                       list_for_each(tmp, &set->set_requests) {
-                               req = list_entry(tmp, struct ptlrpc_request,
-                                                rq_set_chain);
+                       list_for_each_entry(req, &set->set_requests,
+                                           rq_set_chain) {
                                spin_lock(&req->rq_lock);
                                req->rq_invalid_rqset = 1;
                                spin_unlock(&req->rq_lock);
@@ -2536,9 +2572,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
        LASSERT(atomic_read(&set->set_remaining) == 0);
 
        rc = set->set_rc; /* rq_status of already freed requests if any */
-       list_for_each(tmp, &set->set_requests) {
-               req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
                if (req->rq_status != 0)
                        rc = req->rq_status;
@@ -2714,6 +2748,7 @@ EXPORT_SYMBOL(ptlrpc_req_xid);
  */
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 {
+       bool discard = false;
        /*
         * Might sleep.
         */
@@ -2723,20 +2758,23 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
            async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
                request->rq_reply_deadline = ktime_get_real_seconds() +
-                                            LONG_UNLINK;
+                                            PTLRPC_REQ_LONG_UNLINK;
 
        /*
         * Nothing left to do.
         */
-       if (!ptlrpc_client_recv_or_unlink(request))
+       if (!__ptlrpc_cli_wait_unlink(request, &discard))
                RETURN(1);
 
        LNetMDUnlink(request->rq_reply_md_h);
 
+       if (discard) /* Discard the request-out callback */
+               __LNetMDUnlink(request->rq_req_md_h, discard);
+
        /*
         * Let's check it once again.
         */
-       if (!ptlrpc_client_recv_or_unlink(request))
+       if (!ptlrpc_cli_wait_unlink(request))
                RETURN(1);
 
        /* Move to "Unregistering" phase as reply was not unlinked yet. */
@@ -2757,7 +2795,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
                wait_queue_head_t *wq = (request->rq_set) ?
                                        &request->rq_set->set_waitq :
                                        &request->rq_reply_waitq;
-               int seconds = LONG_UNLINK;
+               int seconds = PTLRPC_REQ_LONG_UNLINK;
                /*
                 * Network access will complete in finite time but the HUGE
                 * timeout lets us CWARN for visibility of sluggish NALs
@@ -2765,7 +2803,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
                while (seconds > 0 &&
                       wait_event_idle_timeout(
                               *wq,
-                              !ptlrpc_client_recv_or_unlink(request),
+                              !ptlrpc_cli_wait_unlink(request),
                               cfs_time_seconds(1)) == 0)
                        seconds -= 1;
                if (seconds > 0) {
@@ -2978,7 +3016,7 @@ EXPORT_SYMBOL(ptlrpc_request_addref);
 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                      struct obd_import *imp)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *iter;
 
        assert_spin_locked(&imp->imp_lock);
 
@@ -3006,11 +3044,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
        LASSERT(imp->imp_replayable);
        /* Balanced in ptlrpc_free_committed, usually. */
        ptlrpc_request_addref(req);
-       list_for_each_prev(tmp, &imp->imp_replay_list) {
-               struct ptlrpc_request *iter = list_entry(tmp,
-                                                        struct ptlrpc_request,
-                                                        rq_replay_list);
-
+       list_for_each_entry_reverse(iter, &imp->imp_replay_list,
+                                   rq_replay_list) {
                /*
                 * We may have duplicate transnos if we create and then
                 * open a file, or for closes retained if to match creating
@@ -3258,7 +3293,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
  */
 void ptlrpc_abort_inflight(struct obd_import *imp)
 {
-       struct list_head *tmp, *n;
+       struct ptlrpc_request *req;
        ENTRY;
 
        /*
@@ -3273,11 +3308,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
         * locked?  Also, how do we know if the requests on the list are
         * being freed at this time?
         */
-       list_for_each_safe(tmp, n, &imp->imp_sending_list) {
-               struct ptlrpc_request *req = list_entry(tmp,
-                                                       struct ptlrpc_request,
-                                                       rq_list);
-
+       list_for_each_entry(req, &imp->imp_sending_list, rq_list) {
                DEBUG_REQ(D_RPCTRACE, req, "inflight");
 
                spin_lock(&req->rq_lock);
@@ -3289,10 +3320,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                spin_unlock(&req->rq_lock);
        }
 
-       list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request, rq_list);
-
+       list_for_each_entry(req, &imp->imp_delayed_list, rq_list) {
                DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
 
                spin_lock(&req->rq_lock);
@@ -3319,15 +3347,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
  */
 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp, *pos;
+       struct ptlrpc_request *req;
 
        LASSERT(set != NULL);
 
-       list_for_each_safe(pos, tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(pos, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                spin_lock(&req->rq_lock);
                if (req->rq_phase != RQ_PHASE_RPC) {
                        spin_unlock(&req->rq_lock);
@@ -3400,12 +3424,11 @@ __u64 ptlrpc_next_xid(void)
  * request to ensure previous bulk fails and avoid problems with lost replies
  * and therefore several transfers landing into the same buffer from different
  * sending attempts.
+ * Also, to avoid previous reply landing to a different sending attempt.
  */
-void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+void ptlrpc_set_mbits(struct ptlrpc_request *req)
 {
-       struct ptlrpc_bulk_desc *bd = req->rq_bulk;
-
-       LASSERT(bd != NULL);
+       int md_count = req->rq_bulk ? req->rq_bulk->bd_md_count : 1;
 
        /*
         * Generate new matchbits for all resend requests, including
@@ -3421,7 +3444,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                 * 'resend for the -EINPROGRESS resend'. To make it simple,
                 * we opt to generate mbits for all resend cases.
                 */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data,
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
                                 BULK_MBITS)) {
                        req->rq_mbits = ptlrpc_next_xid();
                } else {
@@ -3435,17 +3458,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                        spin_unlock(&req->rq_import->imp_lock);
                        req->rq_mbits = req->rq_xid;
                }
-               CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
+               CDEBUG(D_HA, "resend with new mbits old x%llu new x%llu\n",
                       old_mbits, req->rq_mbits);
        } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
                /* Request being sent first time, use xid as matchbits. */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)
-                   || req->rq_mbits == 0) {
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
+                                BULK_MBITS) || req->rq_mbits == 0)
+               {
                        req->rq_mbits = req->rq_xid;
                } else {
-                       int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                                       LNET_MAX_IOV;
-                       req->rq_mbits -= total_md - 1;
+                       req->rq_mbits -= md_count - 1;
                }
        } else {
                /*
@@ -3460,8 +3482,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * that server can infer the number of bulks that were prepared,
         * see LU-1431
         */
-       req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                         LNET_MAX_IOV) - 1;
+       req->rq_mbits += md_count - 1;
 
        /*
         * Set rq_xid as rq_mbits to indicate the final bulk for the old
@@ -3470,7 +3491,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * It's ok to directly set the rq_xid here, since this xid bump
         * won't affect the request position in unreplied list.
         */
-       if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
+       if (!OCD_HAS_FLAG(&req->rq_import->imp_connect_data, BULK_MBITS))
                req->rq_xid = req->rq_mbits;
 }