Whamcloud - gitweb
LU-8658 ptlrpc: Suppress error for flock requests
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 16a72d3..f340fc2 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -128,19 +124,21 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
                (ptlrpc_is_bulk_desc_kvec(type) &&
                 ops->add_iov_frag != NULL));
 
+       OBD_ALLOC_PTR(desc);
+       if (desc == NULL)
+               return NULL;
        if (type & PTLRPC_BULK_BUF_KIOV) {
-               OBD_ALLOC(desc,
-                         offsetof(struct ptlrpc_bulk_desc,
-                                  bd_u.bd_kiov.bd_vec[nfrags]));
+               OBD_ALLOC_LARGE(GET_KIOV(desc),
+                               nfrags * sizeof(*GET_KIOV(desc)));
+               if (GET_KIOV(desc) == NULL)
+                       goto out;
        } else {
-               OBD_ALLOC(desc,
-                         offsetof(struct ptlrpc_bulk_desc,
-                                  bd_u.bd_kvec.bd_kvec[nfrags]));
+               OBD_ALLOC_LARGE(GET_KVEC(desc),
+                               nfrags * sizeof(*GET_KVEC(desc)));
+               if (GET_KVEC(desc) == NULL)
+                       goto out;
        }
 
-       if (!desc)
-               return NULL;
-
        spin_lock_init(&desc->bd_lock);
        init_waitqueue_head(&desc->bd_waitq);
        desc->bd_max_iov = nfrags;
@@ -157,6 +155,9 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
                LNetInvalidateHandle(&desc->bd_mds[i]);
 
        return desc;
+out:
+       OBD_FREE_PTR(desc);
+       return NULL;
 }
 
 /**
@@ -207,7 +208,7 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
        LASSERT(page != NULL);
        LASSERT(pageoffset >= 0);
        LASSERT(len > 0);
-       LASSERT(pageoffset + len <= PAGE_CACHE_SIZE);
+       LASSERT(pageoffset + len <= PAGE_SIZE);
        LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
        kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
@@ -215,7 +216,7 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
        desc->bd_nob += len;
 
        if (pin)
-               page_cache_get(page);
+               get_page(page);
 
        kiov->kiov_page = page;
        kiov->kiov_offset = pageoffset;
@@ -271,13 +272,12 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
                desc->bd_frag_ops->release_frags(desc);
 
        if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
-               OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
-                                       bd_u.bd_kiov.bd_vec[desc->bd_max_iov]));
+               OBD_FREE_LARGE(GET_KIOV(desc),
+                       desc->bd_max_iov * sizeof(*GET_KIOV(desc)));
        else
-               OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
-                                       bd_u.bd_kvec.bd_kvec[desc->
-                                               bd_max_iov]));
-
+               OBD_FREE_LARGE(GET_KVEC(desc),
+                       desc->bd_max_iov * sizeof(*GET_KVEC(desc)));
+       OBD_FREE_PTR(desc);
        EXIT;
 }
 EXPORT_SYMBOL(ptlrpc_free_bulk);
@@ -553,10 +553,10 @@ int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
                if (!msg) {
                        ptlrpc_request_cache_free(req);
                        return i;
-                }
-                req->rq_reqbuf = msg;
-                req->rq_reqbuf_len = size;
-                req->rq_pool = pool;
+               }
+               req->rq_reqbuf = msg;
+               req->rq_reqbuf_len = size;
+               req->rq_pool = pool;
                spin_lock(&pool->prp_lock);
                list_add_tail(&req->rq_list, &pool->prp_req_list);
        }
@@ -727,6 +727,8 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
        request->rq_reply_cbid.cbid_arg = request;
 
        request->rq_reply_deadline = 0;
+       request->rq_bulk_deadline = 0;
+       request->rq_req_deadline = 0;
        request->rq_phase = RQ_PHASE_NEW;
        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 
@@ -738,6 +740,35 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
        lustre_msg_set_opc(request->rq_reqmsg, opcode);
        ptlrpc_assign_next_xid(request);
 
+       /* Let's setup deadline for req/reply/bulk unlink for opcode. */
+       if (cfs_fail_val == opcode) {
+               time_t *fail_t = NULL, *fail2_t = NULL;
+
+               if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
+                       fail_t = &request->rq_bulk_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
+                       fail_t = &request->rq_reply_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
+                       fail_t = &request->rq_req_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
+                       fail_t = &request->rq_reply_deadline;
+                       fail2_t = &request->rq_bulk_deadline;
+               }
+
+               if (fail_t) {
+                       *fail_t = cfs_time_current_sec() + LONG_UNLINK;
+
+                       if (fail2_t)
+                               *fail2_t = cfs_time_current_sec() + LONG_UNLINK;
+
+                       /* The RPC is infected, let the test to change the
+                        * fail_loc */
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+                       schedule_timeout(cfs_time_seconds(2));
+                       set_current_state(TASK_RUNNING);
+               }
+       }
+
        RETURN(0);
 
 out_ctx:
@@ -1048,6 +1079,9 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
 {
        LASSERT(list_empty(&req->rq_set_chain));
 
+       if (req->rq_allow_intr)
+               set->set_allow_intr = 1;
+
        /* The set takes over the caller's request reference */
        list_add_tail(&req->rq_set_chain, &set->set_requests);
        req->rq_set = set;
@@ -1220,7 +1254,9 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
                lnet_nid_t nid = imp->imp_connection->c_peer.nid;
                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
 
-               if (ptlrpc_console_allow(req))
+               /* -EAGAIN is normal when using POSIX flocks */
+               if (ptlrpc_console_allow(req) &&
+                   !(opc == LDLM_ENQUEUE && err == -EAGAIN))
                        LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s "
                                           "failed: rc = %d\n",
                                           imp->imp_obd->obd_name,
@@ -1256,7 +1292,7 @@ static void ptlrpc_save_versions(struct ptlrpc_request *req)
 
         LASSERT(versions);
         lustre_msg_set_versions(reqmsg, versions);
-        CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n",
+       CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
                versions[0], versions[1]);
 
         EXIT;
@@ -1272,7 +1308,7 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
 
        req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
                         rq_unreplied_list);
-       LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
+       LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
 
        if (imp->imp_known_replied_xid < req->rq_xid - 1)
                imp->imp_known_replied_xid = req->rq_xid - 1;
@@ -1575,7 +1611,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         }
 
        CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc"
-              " %s:%s:%d:"LPU64":%s:%d\n", current_comm(),
+              " %s:%s:%d:%llu:%s:%d\n", current_comm(),
               imp->imp_obd->obd_uuid.uuid,
               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
               libcfs_nid2str(imp->imp_connection->c_peer.nid),
@@ -1651,8 +1687,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                   rq_set_chain);
                struct obd_import *imp = req->rq_import;
                int unregistered = 0;
+               int async = 1;
                int rc = 0;
 
+               if (req->rq_phase == RQ_PHASE_COMPLETE) {
+                       list_move_tail(&req->rq_set_chain, &comp_reqs);
+                       continue;
+               }
+
                /* This schedule point is mainly for the ptlrpcd caller of this
                 * function.  Most ptlrpc sets are not long-lived and unbounded
                 * in length, but at the least the set used by the ptlrpcd is.
@@ -1669,16 +1711,18 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        req->rq_status = -EINTR;
                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
+                       /* Since it is interpreted and we have to wait for
+                        * the reply to be unlinked, then use sync mode. */
+                       async = 0;
+
                        GOTO(interpret, req->rq_status);
                }
 
-                if (req->rq_phase == RQ_PHASE_NEW &&
-                    ptlrpc_send_new_req(req)) {
-                        force_timer_recalc = 1;
-                }
+               if (req->rq_phase == RQ_PHASE_NEW && ptlrpc_send_new_req(req))
+                       force_timer_recalc = 1;
 
-                /* delayed send - skip */
-                if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
+               /* delayed send - skip */
+               if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
                        continue;
 
                /* delayed resend - skip */
@@ -1686,29 +1730,43 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                    req->rq_sent > cfs_time_current_sec())
                        continue;
 
-                if (!(req->rq_phase == RQ_PHASE_RPC ||
-                      req->rq_phase == RQ_PHASE_BULK ||
-                      req->rq_phase == RQ_PHASE_INTERPRET ||
-                      req->rq_phase == RQ_PHASE_UNREGISTERING ||
-                      req->rq_phase == RQ_PHASE_COMPLETE)) {
-                        DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
-                        LBUG();
-                }
+               if (!(req->rq_phase == RQ_PHASE_RPC ||
+                     req->rq_phase == RQ_PHASE_BULK ||
+                     req->rq_phase == RQ_PHASE_INTERPRET ||
+                     req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                     req->rq_phase == RQ_PHASE_UNREG_BULK)) {
+                       DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
+                       LBUG();
+               }
 
-                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
-                        LASSERT(req->rq_next_phase != req->rq_phase);
-                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+               if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                   req->rq_phase == RQ_PHASE_UNREG_BULK) {
+                       LASSERT(req->rq_next_phase != req->rq_phase);
+                       LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+                       if (req->rq_req_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
+                               req->rq_req_deadline = 0;
+                       if (req->rq_reply_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
+                               req->rq_reply_deadline = 0;
+                       if (req->rq_bulk_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
+                               req->rq_bulk_deadline = 0;
 
-                        /*
-                         * Skip processing until reply is unlinked. We
-                         * can't return to pool before that and we can't
-                         * call interpret before that. We need to make
-                         * sure that all rdma transfers finished and will
-                         * not corrupt any data.
-                         */
-                        if (ptlrpc_client_recv_or_unlink(req) ||
-                            ptlrpc_client_bulk_active(req))
-                                continue;
+                       /*
+                        * Skip processing until reply is unlinked. We
+                        * can't return to pool before that and we can't
+                        * call interpret before that. We need to make
+                        * sure that all rdma transfers finished and will
+                        * not corrupt any data.
+                        */
+                       if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
+                           ptlrpc_client_recv_or_unlink(req))
+                               continue;
+                       if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
+                           ptlrpc_client_bulk_active(req))
+                               continue;
 
                         /*
                          * Turn fail_loc off to prevent it from looping
@@ -1730,11 +1788,6 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         ptlrpc_rqphase_move(req, req->rq_next_phase);
                 }
 
-                if (req->rq_phase == RQ_PHASE_COMPLETE) {
-                       list_move_tail(&req->rq_set_chain, &comp_reqs);
-                        continue;
-               }
-
                 if (req->rq_phase == RQ_PHASE_INTERPRET)
                         GOTO(interpret, req->rq_status);
 
@@ -1951,27 +2004,27 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        req->rq_status = -EIO;
                }
 
-                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
+               ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
-        interpret:
-                LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
+       interpret:
+               LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
 
-                /* This moves to "unregistering" phase we need to wait for
-                 * reply unlink. */
-                if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
-                        /* start async bulk unlink too */
-                        ptlrpc_unregister_bulk(req, 1);
-                        continue;
-                }
+               /* This moves to "unregistering" phase we need to wait for
+                * reply unlink. */
+               if (!unregistered && !ptlrpc_unregister_reply(req, async)) {
+                       /* start async bulk unlink too */
+                       ptlrpc_unregister_bulk(req, 1);
+                       continue;
+               }
 
-                if (!ptlrpc_unregister_bulk(req, 1))
-                        continue;
+               if (!ptlrpc_unregister_bulk(req, async))
+                       continue;
 
-                /* When calling interpret receiving already should be
-                 * finished. */
-                LASSERT(!req->rq_receiving_reply);
+               /* When calling interpret receiving already should be
+                * finished. */
+               LASSERT(!req->rq_receiving_reply);
 
-                ptlrpc_req_interpret(env, req, req->rq_status);
+               ptlrpc_req_interpret(env, req, req->rq_status);
 
                if (ptlrpcd_check_work(req)) {
                        atomic_dec(&set->set_remaining);
@@ -1981,7 +2034,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 
                CDEBUG(req->rq_reqmsg != NULL ? D_RPCTRACE : 0,
                        "Completed RPC pname:cluuid:pid:xid:nid:"
-                       "opc %s:%s:%d:"LPU64":%s:%d\n",
+                       "opc %s:%s:%d:%llu:%s:%d\n",
                        current_comm(), imp->imp_obd->obd_uuid.uuid,
                        lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
@@ -2057,8 +2110,8 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
                       "timed out for sent delay" : "timed out for slow reply"),
                   req->rq_sent, req->rq_real_sent);
 
-        if (imp != NULL && obd_debug_peer_on_timeout)
-                LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
+       if (imp != NULL && obd_debug_peer_on_timeout)
+               LNetDebugPeer(imp->imp_connection->c_peer);
 
         ptlrpc_unregister_reply(req, async_unlink);
         ptlrpc_unregister_bulk(req, async_unlink);
@@ -2180,8 +2233,11 @@ static void ptlrpc_interrupted_set(void *data)
                struct ptlrpc_request *req =
                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
+               if (req->rq_intr)
+                       continue;
+
                if (req->rq_phase != RQ_PHASE_RPC &&
-                   req->rq_phase != RQ_PHASE_UNREGISTERING &&
+                   req->rq_phase != RQ_PHASE_UNREG_RPC &&
                    !req->rq_allow_intr)
                        continue;
 
@@ -2274,17 +2330,12 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
                        set, timeout);
 
-               if (timeout == 0 && !signal_pending(current))
-                        /*
-                         * No requests are in-flight (ether timed out
-                         * or delayed), so we can allow interrupts.
-                         * We still want to block for a limited time,
-                         * so we allow interrupts during the timeout.
-                         */
-                       lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
-                                                   ptlrpc_expired_set,
-                                                   ptlrpc_interrupted_set, set);
-               else if (set->set_allow_intr)
+               if ((timeout == 0 && !signal_pending(current)) ||
+                   set->set_allow_intr)
+                       /* No requests are in-flight (ether timed out
+                        * or delayed), so we can allow interrupts.
+                        * We still want to block for a limited time,
+                        * so we allow interrupts during the timeout. */
                        lwi = LWI_TIMEOUT_INTR_ALL(
                                        cfs_time_seconds(timeout ? timeout : 1),
                                        ptlrpc_expired_set,
@@ -2512,12 +2563,11 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         */
        LASSERT(!in_interrupt());
 
-       /*
-        * Let's setup deadline for reply unlink.
-        */
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-            async && request->rq_reply_deadline == 0)
-                request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+       /* Let's setup deadline for reply unlink. */
+       if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
+           async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
+               request->rq_reply_deadline =
+                       cfs_time_current_sec() + LONG_UNLINK;
 
         /*
          * Nothing left to do.
@@ -2533,10 +2583,8 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         if (!ptlrpc_client_recv_or_unlink(request))
                 RETURN(1);
 
-        /*
-         * Move to "Unregistering" phase as reply was not unlinked yet.
-         */
-        ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
+       /* Move to "Unregistering" phase as reply was not unlinked yet. */
+       ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
 
         /*
          * Do not wait for unlink to finish.
@@ -2628,11 +2676,11 @@ void ptlrpc_free_committed(struct obd_import *imp)
 
         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
             imp->imp_generation == imp->imp_last_generation_checked) {
-                CDEBUG(D_INFO, "%s: skip recheck: last_committed "LPU64"\n",
+               CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
                RETURN_EXIT;
         }
-        CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
+       CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
                imp->imp_generation);
 
@@ -2671,7 +2719,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
                        continue;
                }
 
-                DEBUG_REQ(D_INFO, req, "commit (last_committed "LPU64")",
+               DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
                           imp->imp_peer_committed_transno);
 free_req:
                ptlrpc_free_request(req);
@@ -2834,9 +2882,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                RETURN(-ENOMEM);
        }
 
-       if (req->rq_allow_intr)
-               set->set_allow_intr = 1;
-
        /* for distributed debugging */
        lustre_msg_set_status(req->rq_reqmsg, current_pid());
 
@@ -2894,7 +2939,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
                 LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
                          lustre_msg_get_transno(req->rq_repmsg) ||
                          lustre_msg_get_transno(req->rq_repmsg) == 0,
-                         LPX64"/"LPX64"\n",
+                        "%#llx/%#llx\n",
                          lustre_msg_get_transno(req->rq_reqmsg),
                          lustre_msg_get_transno(req->rq_repmsg));
         }
@@ -2910,8 +2955,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
         /* transaction number shouldn't be bigger than the latest replayed */
         if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
                 DEBUG_REQ(D_ERROR, req,
-                          "Reported transno "LPU64" is bigger than the "
-                          "replayed one: "LPU64, req->rq_transno,
+                         "Reported transno %llu is bigger than the "
+                         "replayed one: %llu", req->rq_transno,
                           lustre_msg_get_transno(req->rq_reqmsg));
                 GOTO(out, rc = -EINVAL);
         }
@@ -3202,13 +3247,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
        } else { /* needs to generate a new matchbits for resend */
                __u64   old_mbits = req->rq_mbits;
 
-               if ((bd->bd_import->imp_connect_data.ocd_connect_flags &
-                   OBD_CONNECT_BULK_MBITS) != 0)
+               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)){
                        req->rq_mbits = ptlrpc_next_xid();
-               else /* old version transfers rq_xid to peer as matchbits */
-                       req->rq_mbits = req->rq_xid = ptlrpc_next_xid();
-
-               CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
+               } else {/* old version transfers rq_xid to peer as matchbits */
+                       spin_lock(&req->rq_import->imp_lock);
+                       list_del_init(&req->rq_unreplied_list);
+                       ptlrpc_assign_next_xid_nolock(req);
+                       req->rq_mbits = req->rq_xid;
+                       spin_unlock(&req->rq_import->imp_lock);
+               }
+               CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
                       old_mbits, req->rq_mbits);
        }
 
@@ -3217,6 +3265,11 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * see LU-1431 */
        req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
                          LNET_MAX_IOV) - 1;
+
+       /* Set rq_xid as rq_mbits to indicate the final bulk for the old
+        * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808 */
+       if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
+               req->rq_xid = req->rq_mbits;
 }
 
 /**
@@ -3269,7 +3322,6 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
        req->rq_timeout         = obd_timeout;
        req->rq_sent            = cfs_time_current_sec();
        req->rq_deadline        = req->rq_sent + req->rq_timeout;
-       req->rq_reply_deadline  = req->rq_deadline;
        req->rq_phase           = RQ_PHASE_INTERPRET;
        req->rq_next_phase      = RQ_PHASE_COMPLETE;
        req->rq_xid             = ptlrpc_next_xid();