Whamcloud - gitweb
LU-793 ptlrpc: allow client to reconnect with RPC in progress
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 4d2ce16..08827eb 100644 (file)
@@ -1520,6 +1520,45 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
        RETURN(1); /* return "did_something" for liblustre */
 }
 
+/* Check if we are already handling earlier incarnation of this request.
+ * Called under &req->rq_export->exp_rpc_lock locked */
+static int ptlrpc_server_check_resend_in_progress(struct ptlrpc_request *req)
+{
+       struct ptlrpc_request   *tmp = NULL;
+
+       if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ||
+           (cfs_atomic_read(&req->rq_export->exp_rpc_count) == 0))
+               return 0;
+
+       /* bulk request are aborted upon reconnect, don't try to
+        * find a match */
+       if (req->rq_bulk_write || req->rq_bulk_read)
+               return 0;
+
+       /* This list should not be longer than max_requests in
+        * flights on the client, so it is not all that long.
+        * Also we only hit this codepath in case of a resent
+        * request which makes it even more rarely hit */
+       cfs_list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs,
+                               rq_exp_list) {
+               /* Found duplicate one */
+               if (tmp->rq_xid == req->rq_xid)
+                       goto found;
+       }
+       cfs_list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs,
+                               rq_exp_list) {
+               /* Found duplicate one */
+               if (tmp->rq_xid == req->rq_xid)
+                       goto found;
+       }
+       return 0;
+
+found:
+       DEBUG_REQ(D_HA, req, "Found duplicate req in processing\n");
+       DEBUG_REQ(D_HA, tmp, "Request being processed\n");
+       return -EBUSY;
+}
+
 /**
  * Put the request to the export list if the request may become
  * a high priority one.
@@ -1527,7 +1566,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
 static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                                    struct ptlrpc_request *req)
 {
-       int rc = 0;
+       cfs_list_t      *list;
+       int              rc, hp = 0;
+
        ENTRY;
 
        if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
@@ -1536,48 +1577,61 @@ static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                        RETURN(rc);
                LASSERT(rc == 0);
        }
-       if (req->rq_export && req->rq_ops) {
-               /* Perform request specific check. We should do this check
-                * before the request is added into exp_hp_rpcs list otherwise
-                * it may hit swab race at LU-1044. */
-               if (req->rq_ops->hpreq_check) {
-                       rc = req->rq_ops->hpreq_check(req);
-                       /**
-                        * XXX: Out of all current
-                        * ptlrpc_hpreq_ops::hpreq_check(), only
-                        * ldlm_cancel_hpreq_check() can return an error code;
-                        * other functions assert in similar places, which seems
-                        * odd. What also does not seem right is that handlers
-                        * for those RPCs do not assert on the same checks, but
-                        * rather handle the error cases. e.g. see
-                        * ost_rw_hpreq_check(), and ost_brw_read(),
-                        * ost_brw_write().
-                        */
-                       if (rc < 0)
-                               RETURN(rc);
-                       LASSERT(rc == 0 || rc == 1);
+       if (req->rq_export) {
+               if (req->rq_ops) {
+                       /* Perform request specific check. We should do this
+                        * check before the request is added into exp_hp_rpcs
+                        * list otherwise it may hit swab race at LU-1044. */
+                       if (req->rq_ops->hpreq_check) {
+                               rc = req->rq_ops->hpreq_check(req);
+                               /**
+                                * XXX: Out of all current
+                                * ptlrpc_hpreq_ops::hpreq_check(), only
+                                * ldlm_cancel_hpreq_check() can return an
+                                * error code; other functions assert in
+                                * similar places, which seems odd.
+                                * What also does not seem right is that
+                                * handlers for those RPCs do not assert
+                                * on the same checks, but rather handle the
+                                * error cases. e.g. see ost_rw_hpreq_check(),
+                                * and ost_brw_read(), ost_brw_write().
+                                */
+                               if (rc < 0)
+                                       RETURN(rc);
+                               LASSERT(rc == 0 || rc == 1);
+                               hp = rc;
+                       }
+                       list = &req->rq_export->exp_hp_rpcs;
+               } else {
+                       list = &req->rq_export->exp_reg_rpcs;
                }
 
+               /* do search for duplicated xid and the adding to the list
+                * atomically */
                spin_lock_bh(&req->rq_export->exp_rpc_lock);
-               cfs_list_add(&req->rq_exp_list,
-                            &req->rq_export->exp_hp_rpcs);
+               rc = ptlrpc_server_check_resend_in_progress(req);
+               if (rc < 0) {
+                       spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+                       RETURN(rc);
+               }
+               cfs_list_add(&req->rq_exp_list, list);
                spin_unlock_bh(&req->rq_export->exp_rpc_lock);
        }
 
-       ptlrpc_nrs_req_initialize(svcpt, req, rc);
+       ptlrpc_nrs_req_initialize(svcpt, req, !!hp);
 
-       RETURN(rc);
+       RETURN(hp);
 }
 
 /** Remove the request from the export list. */
 static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
 {
-        ENTRY;
-        if (req->rq_export && req->rq_ops) {
-                /* refresh lock timeout again so that client has more
-                 * room to send lock cancel RPC. */
-                if (req->rq_ops->hpreq_fini)
-                        req->rq_ops->hpreq_fini(req);
+       ENTRY;
+       if (req->rq_export) {
+               /* refresh lock timeout again so that client has more
+                * room to send lock cancel RPC. */
+               if (req->rq_ops && req->rq_ops->hpreq_fini)
+                       req->rq_ops->hpreq_fini(req);
 
                spin_lock_bh(&req->rq_export->exp_rpc_lock);
                cfs_list_del_init(&req->rq_exp_list);