Whamcloud - gitweb
LU-5951 ptlrpc: track unreplied requests
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index a791abf..842d158 100644 (file)
@@ -651,6 +651,42 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
        spin_unlock(&pool->prp_lock);
 }
 
+void ptlrpc_add_unreplied(struct ptlrpc_request *req)
+{
+       struct obd_import       *imp = req->rq_import;
+       struct list_head        *tmp;
+       struct ptlrpc_request   *iter;
+
+       assert_spin_locked(&imp->imp_lock);
+       LASSERT(list_empty(&req->rq_unreplied_list));
+
+       /* unreplied list is sorted by xid in ascending order */
+       list_for_each_prev(tmp, &imp->imp_unreplied_list) {
+               iter = list_entry(tmp, struct ptlrpc_request,
+                                 rq_unreplied_list);
+
+               LASSERT(req->rq_xid != iter->rq_xid);
+               if (req->rq_xid < iter->rq_xid)
+                       continue;
+               list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
+               return;
+       }
+       list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
+}
+
+void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
+{
+       req->rq_xid = ptlrpc_next_xid();
+       ptlrpc_add_unreplied(req);
+}
+
+static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+}
+
 static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                                       __u32 version, int opcode,
                                       int count, __u32 *lengths, char **bufs,
@@ -697,6 +733,7 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
         ptlrpc_at_set_req_timeout(request);
 
        lustre_msg_set_opc(request->rq_reqmsg, opcode);
+       ptlrpc_assign_next_xid(request);
 
        RETURN(0);
 out_ctx:
@@ -1270,6 +1307,24 @@ static void ptlrpc_save_versions(struct ptlrpc_request *req)
         EXIT;
 }
 
+__u64 ptlrpc_known_replied_xid(struct obd_import *imp)
+{
+       struct ptlrpc_request *req;
+
+       assert_spin_locked(&imp->imp_lock);
+       if (list_empty(&imp->imp_unreplied_list))
+               return 0;
+
+       req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
+                        rq_unreplied_list);
+       LASSERTF(req->rq_xid >= 1, "XID:"LPU64"\n", req->rq_xid);
+
+       if (imp->imp_known_replied_xid < req->rq_xid - 1)
+               imp->imp_known_replied_xid = req->rq_xid - 1;
+
+       return req->rq_xid - 1;
+}
+
 /**
  * Callback function called when client receives RPC reply for \a req.
  * Returns 0 on success or error code.
@@ -1357,6 +1412,11 @@ static int after_reply(struct ptlrpc_request *req)
                else
                        req->rq_sent = now + req->rq_nr_resend;
 
+               /* Resend for EINPROGRESS will use a new XID */
+               spin_lock(&imp->imp_lock);
+               list_del_init(&req->rq_unreplied_list);
+               spin_unlock(&imp->imp_lock);
+
                RETURN(0);
        }
 
@@ -1472,8 +1532,7 @@ static int after_reply(struct ptlrpc_request *req)
 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 {
         struct obd_import     *imp = req->rq_import;
-       struct list_head      *tmp;
-       __u64                  min_xid = ~0ULL;
+       __u64                  min_xid = 0;
         int rc;
         ENTRY;
 
@@ -1494,15 +1553,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        spin_lock(&imp->imp_lock);
 
-       /* the very first time we assign XID. it's important to assign XID
-        * and put it on the list atomically, so that the lowest assigned
-        * XID is always known. this is vital for multislot last_rcvd */
-       if (req->rq_send_state == LUSTRE_IMP_REPLAY) {
-               LASSERT(req->rq_xid != 0);
-       } else {
-               LASSERT(req->rq_xid == 0);
-               req->rq_xid = ptlrpc_next_xid();
-       }
+       LASSERT(req->rq_xid != 0);
+       LASSERT(!list_empty(&req->rq_unreplied_list));
 
        if (!req->rq_generation_set)
                req->rq_import_generation = imp->imp_generation;
@@ -1534,23 +1586,23 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
        list_add_tail(&req->rq_list, &imp->imp_sending_list);
        atomic_inc(&req->rq_import->imp_inflight);
 
-       /* find the lowest unreplied XID */
-       list_for_each(tmp, &imp->imp_delayed_list) {
-               struct ptlrpc_request *r;
-               r = list_entry(tmp, struct ptlrpc_request, rq_list);
-               if (r->rq_xid < min_xid)
-                       min_xid = r->rq_xid;
-       }
-       list_for_each(tmp, &imp->imp_sending_list) {
-               struct ptlrpc_request *r;
-               r = list_entry(tmp, struct ptlrpc_request, rq_list);
-               if (r->rq_xid < min_xid)
-                       min_xid = r->rq_xid;
-       }
+       /* find the known replied XID from the unreplied list, CONNECT
+        * and DISCONNECT requests are skipped to make the sanity check
+        * on server side happy. see process_req_last_xid().
+        *
+        * For CONNECT: Because replay requests have lower XID, it'll
+        * break the sanity check if CONNECT bump the exp_last_xid on
+        * server.
+        *
+        * For DISCONNECT: Since client will abort inflight RPC before
+        * sending DISCONNECT, DISCONNECT may carry an XID which higher
+        * than the inflight RPC.
+        */
+       if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
+               min_xid = ptlrpc_known_replied_xid(imp);
        spin_unlock(&imp->imp_lock);
 
-       if (likely(min_xid != ~0ULL))
-               lustre_msg_set_last_xid(req->rq_reqmsg, min_xid - 1);
+       lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
 
        lustre_msg_set_status(req->rq_reqmsg, current_pid());
 
@@ -1978,6 +2030,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        list_del_init(&req->rq_list);
                        atomic_dec(&imp->imp_inflight);
                }
+               list_del_init(&req->rq_unreplied_list);
                spin_unlock(&imp->imp_lock);
 
                atomic_dec(&set->set_remaining);
@@ -2375,6 +2428,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                if (!locked)
                        spin_lock(&request->rq_import->imp_lock);
                list_del_init(&request->rq_replay_list);
+               list_del_init(&request->rq_unreplied_list);
                if (!locked)
                        spin_unlock(&request->rq_import->imp_lock);
         }
@@ -3122,7 +3176,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
 
        LASSERT(bd != NULL);
 
-       if (!req->rq_resend || req->rq_nr_resend != 0) {
+       if (!req->rq_resend) {
                /* this request has a new xid, just use it as bulk matchbits */
                req->rq_mbits = req->rq_xid;