Whamcloud - gitweb
LU-11444 ptlrpc: resend may corrupt the data
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 1d39dd0..974969b 100644 (file)
@@ -41,6 +41,7 @@
 #include <lu_object.h>
 #include <uapi/linux/lnet/lnet-types.h>
 #include "ptlrpc_internal.h"
+#include <linux/delay.h>
 
 /* The following are visible and mutable through /sys/module/ptlrpc */
 int test_req_buffer_pressure = 0;
@@ -1024,6 +1025,30 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        }
 }
 
+static void ptlrpc_add_exp_list_nolock(struct ptlrpc_request *req,
+                                      struct obd_export *export, bool hp)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (hp)
+               list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+       else
+               list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+       if (tag && export->exp_used_slots)
+               set_bit(tag - 1, export->exp_used_slots);
+}
+
+static void ptlrpc_del_exp_list(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       spin_lock(&req->rq_export->exp_rpc_lock);
+       list_del_init(&req->rq_exp_list);
+       if (tag && !req->rq_obsolete && req->rq_export->exp_used_slots)
+               clear_bit(tag - 1, req->rq_export->exp_used_slots);
+       spin_unlock(&req->rq_export->exp_rpc_lock);
+}
+
 /** Change request export and move hp request from old export to new */
 void ptlrpc_request_change_export(struct ptlrpc_request *req,
                                  struct obd_export *export)
@@ -1031,19 +1056,13 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req,
        if (req->rq_export != NULL) {
                LASSERT(!list_empty(&req->rq_exp_list));
                /* remove rq_exp_list from last export */
-               spin_lock(&req->rq_export->exp_rpc_lock);
-               list_del_init(&req->rq_exp_list);
-               spin_unlock(&req->rq_export->exp_rpc_lock);
-               /*
-                * export has one reference already, so it`s safe to
+               ptlrpc_del_exp_list(req);
+               /* export has one reference already, so it's safe to
                 * add req to export queue here and get another
                 * reference for request later
                 */
                spin_lock(&export->exp_rpc_lock);
-               if (req->rq_ops != NULL) /* hp request */
-                       list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
-               else
-                       list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+               ptlrpc_add_exp_list_nolock(req, export, req->rq_ops != NULL);
                spin_unlock(&export->exp_rpc_lock);
 
                class_export_rpc_dec(req->rq_export);
@@ -1664,6 +1683,47 @@ found:
        return tmp;
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static void ptlrpc_server_mark_obsolete(struct ptlrpc_request *req)
+{
+       req->rq_obsolete = 1;
+}
+
+static void
+ptlrpc_server_mark_in_progress_obsolete(struct ptlrpc_request *req)
+{
+       struct ptlrpc_request   *tmp = NULL;
+       __u16                   tag;
+
+       if (!tgt_is_increasing_xid_client(req->rq_export) ||
+           req->rq_export->exp_used_slots == NULL)
+               return;
+
+       tag = lustre_msg_get_tag(req->rq_reqmsg);
+       if (tag == 0)
+               return;
+
+       if (!test_bit(tag - 1, req->rq_export->exp_used_slots))
+               return;
+
+       /* This list should not be longer than max_requests in
+        * flights on the client, so it is not all that long.
+        * Also we only hit this codepath in case of a resent
+        * request which makes it even more rarely hit */
+       list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs, rq_exp_list) {
+               if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+                   req->rq_xid > tmp->rq_xid)
+                       ptlrpc_server_mark_obsolete(tmp);
+
+       }
+       list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs, rq_exp_list) {
+               if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+                   req->rq_xid > tmp->rq_xid)
+                       ptlrpc_server_mark_obsolete(tmp);
+       }
+}
+#endif
+
 /**
  * Check if a request should be assigned with a high priority.
  *
@@ -1721,9 +1781,7 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
                if (req->rq_ops && req->rq_ops->hpreq_fini)
                        req->rq_ops->hpreq_fini(req);
 
-               spin_lock(&req->rq_export->exp_rpc_lock);
-               list_del_init(&req->rq_exp_list);
-               spin_unlock(&req->rq_export->exp_rpc_lock);
+               ptlrpc_del_exp_list(req);
        }
        EXIT;
 }
@@ -1770,7 +1828,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
        hp = rc > 0;
        ptlrpc_nrs_req_initialize(svcpt, req, hp);
 
-       if (req->rq_export != NULL) {
+       while (req->rq_export != NULL) {
                struct obd_export *exp = req->rq_export;
 
                /*
@@ -1778,7 +1836,18 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                 * atomically
                 */
                spin_lock_bh(&exp->exp_rpc_lock);
+#ifdef HAVE_SERVER_SUPPORT
+               ptlrpc_server_mark_in_progress_obsolete(req);
+#endif
                orig = ptlrpc_server_check_resend_in_progress(req);
+               if (orig && OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+                       spin_unlock_bh(&exp->exp_rpc_lock);
+
+                       OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+                       msleep(4 * MSEC_PER_SEC);
+                       continue;
+               }
+
                if (orig && likely(atomic_inc_not_zero(&orig->rq_refcount))) {
                        bool linked;
 
@@ -1801,14 +1870,17 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                                ptlrpc_at_add_timed(orig);
                        ptlrpc_server_drop_request(orig);
                        ptlrpc_nrs_req_finalize(req);
+
+                       /* don't mark slot unused for resend in progress */
+                       req->rq_obsolete = 1;
+
                        RETURN(-EBUSY);
                }
 
-               if (hp || req->rq_ops != NULL)
-                       list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
-               else
-                       list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+               ptlrpc_add_exp_list_nolock(req, exp, hp || req->rq_ops != NULL);
+
                spin_unlock_bh(&exp->exp_rpc_lock);
+               break;
        }
 
        /*