Whamcloud - gitweb
LU-6808 ptlrpc: properly set "rq_xid" for 4MB IO
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index c9f293c..59f4cc5 100644 (file)
@@ -212,7 +212,7 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
        LASSERT(page != NULL);
        LASSERT(pageoffset >= 0);
        LASSERT(len > 0);
-       LASSERT(pageoffset + len <= PAGE_CACHE_SIZE);
+       LASSERT(pageoffset + len <= PAGE_SIZE);
        LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
        kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
@@ -731,6 +731,8 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
        request->rq_reply_cbid.cbid_arg = request;
 
        request->rq_reply_deadline = 0;
+       request->rq_bulk_deadline = 0;
+       request->rq_req_deadline = 0;
        request->rq_phase = RQ_PHASE_NEW;
        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 
@@ -742,6 +744,35 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
        lustre_msg_set_opc(request->rq_reqmsg, opcode);
        ptlrpc_assign_next_xid(request);
 
+       /* Let's setup deadline for req/reply/bulk unlink for opcode. */
+       if (cfs_fail_val == opcode) {
+               time_t *fail_t = NULL, *fail2_t = NULL;
+
+               if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
+                       fail_t = &request->rq_bulk_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
+                       fail_t = &request->rq_reply_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
+                       fail_t = &request->rq_req_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
+                       fail_t = &request->rq_reply_deadline;
+                       fail2_t = &request->rq_bulk_deadline;
+               }
+
+               if (fail_t) {
+                       *fail_t = cfs_time_current_sec() + LONG_UNLINK;
+
+                       if (fail2_t)
+                               *fail2_t = cfs_time_current_sec() + LONG_UNLINK;
+
+                       /* The RPC is infected, let the test to change the
+                        * fail_loc */
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+                       schedule_timeout(cfs_time_seconds(2));
+                       set_current_state(TASK_RUNNING);
+               }
+       }
+
        RETURN(0);
 
 out_ctx:
@@ -1263,7 +1294,7 @@ static void ptlrpc_save_versions(struct ptlrpc_request *req)
 
         LASSERT(versions);
         lustre_msg_set_versions(reqmsg, versions);
-        CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n",
+       CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
                versions[0], versions[1]);
 
         EXIT;
@@ -1279,7 +1310,7 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
 
        req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
                         rq_unreplied_list);
-       LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
+       LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
 
        if (imp->imp_known_replied_xid < req->rq_xid - 1)
                imp->imp_known_replied_xid = req->rq_xid - 1;
@@ -1582,7 +1613,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         }
 
        CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc"
-              " %s:%s:%d:"LPU64":%s:%d\n", current_comm(),
+              " %s:%s:%d:%llu:%s:%d\n", current_comm(),
               imp->imp_obd->obd_uuid.uuid,
               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
               libcfs_nid2str(imp->imp_connection->c_peer.nid),
@@ -1704,25 +1735,40 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                if (!(req->rq_phase == RQ_PHASE_RPC ||
                      req->rq_phase == RQ_PHASE_BULK ||
                      req->rq_phase == RQ_PHASE_INTERPRET ||
-                     req->rq_phase == RQ_PHASE_UNREGISTERING)) {
-                        DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
-                        LBUG();
-                }
+                     req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                     req->rq_phase == RQ_PHASE_UNREG_BULK)) {
+                       DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
+                       LBUG();
+               }
 
-                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
-                        LASSERT(req->rq_next_phase != req->rq_phase);
-                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+               if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                   req->rq_phase == RQ_PHASE_UNREG_BULK) {
+                       LASSERT(req->rq_next_phase != req->rq_phase);
+                       LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+                       if (req->rq_req_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
+                               req->rq_req_deadline = 0;
+                       if (req->rq_reply_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
+                               req->rq_reply_deadline = 0;
+                       if (req->rq_bulk_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
+                               req->rq_bulk_deadline = 0;
 
-                        /*
-                         * Skip processing until reply is unlinked. We
-                         * can't return to pool before that and we can't
-                         * call interpret before that. We need to make
-                         * sure that all rdma transfers finished and will
-                         * not corrupt any data.
-                         */
-                        if (ptlrpc_client_recv_or_unlink(req) ||
-                            ptlrpc_client_bulk_active(req))
-                                continue;
+                       /*
+                        * Skip processing until reply is unlinked. We
+                        * can't return to pool before that and we can't
+                        * call interpret before that. We need to make
+                        * sure that all rdma transfers finished and will
+                        * not corrupt any data.
+                        */
+                       if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
+                           ptlrpc_client_recv_or_unlink(req))
+                               continue;
+                       if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
+                           ptlrpc_client_bulk_active(req))
+                               continue;
 
                         /*
                          * Turn fail_loc off to prevent it from looping
@@ -1990,7 +2036,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 
                CDEBUG(req->rq_reqmsg != NULL ? D_RPCTRACE : 0,
                        "Completed RPC pname:cluuid:pid:xid:nid:"
-                       "opc %s:%s:%d:"LPU64":%s:%d\n",
+                       "opc %s:%s:%d:%llu:%s:%d\n",
                        current_comm(), imp->imp_obd->obd_uuid.uuid,
                        lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
@@ -2193,7 +2239,7 @@ static void ptlrpc_interrupted_set(void *data)
                        continue;
 
                if (req->rq_phase != RQ_PHASE_RPC &&
-                   req->rq_phase != RQ_PHASE_UNREGISTERING &&
+                   req->rq_phase != RQ_PHASE_UNREG_RPC &&
                    !req->rq_allow_intr)
                        continue;
 
@@ -2519,12 +2565,11 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         */
        LASSERT(!in_interrupt());
 
-       /*
-        * Let's setup deadline for reply unlink.
-        */
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-            async && request->rq_reply_deadline == 0)
-                request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+       /* Let's setup deadline for reply unlink. */
+       if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
+           async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
+               request->rq_reply_deadline =
+                       cfs_time_current_sec() + LONG_UNLINK;
 
         /*
          * Nothing left to do.
@@ -2540,10 +2585,8 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         if (!ptlrpc_client_recv_or_unlink(request))
                 RETURN(1);
 
-        /*
-         * Move to "Unregistering" phase as reply was not unlinked yet.
-         */
-        ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
+       /* Move to "Unregistering" phase as reply was not unlinked yet. */
+       ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
 
         /*
          * Do not wait for unlink to finish.
@@ -2635,11 +2678,11 @@ void ptlrpc_free_committed(struct obd_import *imp)
 
         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
             imp->imp_generation == imp->imp_last_generation_checked) {
-                CDEBUG(D_INFO, "%s: skip recheck: last_committed "LPU64"\n",
+               CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
                RETURN_EXIT;
         }
-        CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
+       CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
                imp->imp_generation);
 
@@ -2678,7 +2721,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
                        continue;
                }
 
-                DEBUG_REQ(D_INFO, req, "commit (last_committed "LPU64")",
+               DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
                           imp->imp_peer_committed_transno);
 free_req:
                ptlrpc_free_request(req);
@@ -2898,7 +2941,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
                 LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
                          lustre_msg_get_transno(req->rq_repmsg) ||
                          lustre_msg_get_transno(req->rq_repmsg) == 0,
-                         LPX64"/"LPX64"\n",
+                        "%#llx/%#llx\n",
                          lustre_msg_get_transno(req->rq_reqmsg),
                          lustre_msg_get_transno(req->rq_repmsg));
         }
@@ -2914,8 +2957,8 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
         /* transaction number shouldn't be bigger than the latest replayed */
         if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
                 DEBUG_REQ(D_ERROR, req,
-                          "Reported transno "LPU64" is bigger than the "
-                          "replayed one: "LPU64, req->rq_transno,
+                         "Reported transno %llu is bigger than the "
+                         "replayed one: %llu", req->rq_transno,
                           lustre_msg_get_transno(req->rq_reqmsg));
                 GOTO(out, rc = -EINVAL);
         }
@@ -3206,13 +3249,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
        } else { /* needs to generate a new matchbits for resend */
                __u64   old_mbits = req->rq_mbits;
 
-               if ((bd->bd_import->imp_connect_data.ocd_connect_flags &
-                   OBD_CONNECT_BULK_MBITS) != 0)
+               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)){
                        req->rq_mbits = ptlrpc_next_xid();
-               else /* old version transfers rq_xid to peer as matchbits */
-                       req->rq_mbits = req->rq_xid = ptlrpc_next_xid();
-
-               CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
+               } else {/* old version transfers rq_xid to peer as matchbits */
+                       spin_lock(&req->rq_import->imp_lock);
+                       list_del_init(&req->rq_unreplied_list);
+                       ptlrpc_assign_next_xid_nolock(req);
+                       req->rq_mbits = req->rq_xid;
+                       spin_unlock(&req->rq_import->imp_lock);
+               }
+               CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
                       old_mbits, req->rq_mbits);
        }
 
@@ -3221,6 +3267,11 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * see LU-1431 */
        req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
                          LNET_MAX_IOV) - 1;
+
+       /* Set rq_xid as rq_mbits to indicate the final bulk for the old
+        * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808 */
+       if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
+               req->rq_xid = req->rq_mbits;
 }
 
 /**
@@ -3273,7 +3324,6 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
        req->rq_timeout         = obd_timeout;
        req->rq_sent            = cfs_time_current_sec();
        req->rq_deadline        = req->rq_sent + req->rq_timeout;
-       req->rq_reply_deadline  = req->rq_deadline;
        req->rq_phase           = RQ_PHASE_INTERPRET;
        req->rq_next_phase      = RQ_PHASE_COMPLETE;
        req->rq_xid             = ptlrpc_next_xid();