Whamcloud - gitweb
LU-12567 ptlrpc: handle reply and resend reorder
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 0577f6a..04fe734 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 /** Implementation of client-side PortalRPC interfaces */
@@ -406,10 +405,10 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
                 * resent time, but server sent back service time of original
                 * RPC.
                 */
-               CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
-                      D_ADAPTTO : D_WARNING,
-                      "Reported service time %u > total measured time %lld\n",
-                      service_timeout, now - req->rq_sent);
+               CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) &
+                             MSG_RESENT) ?  D_ADAPTTO : D_WARNING,
+                            "Reported service time %u > total measured time %lld\n",
+                            service_timeout, now - req->rq_sent);
                return;
        }
 
@@ -668,8 +667,8 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
                return NULL;
        }
 
-       request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
-                            rq_list);
+       request = list_first_entry(&pool->prp_req_list, struct ptlrpc_request,
+                                  rq_list);
        list_del_init(&request->rq_list);
        spin_unlock(&pool->prp_lock);
 
@@ -1400,8 +1399,8 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
        if (list_empty(&imp->imp_unreplied_list))
                return 0;
 
-       req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
-                        rq_unreplied_list);
+       req = list_first_entry(&imp->imp_unreplied_list, struct ptlrpc_request,
+                              rq_unreplied_list);
        LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
 
        if (imp->imp_known_replied_xid < req->rq_xid - 1)
@@ -1697,7 +1696,22 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        lustre_msg_set_status(req->rq_reqmsg, current->pid);
 
-       rc = sptlrpc_req_refresh_ctx(req, 0);
+       /* If the request to be sent is an LDLM callback, do not try to
+        * refresh context.
+        * An LDLM callback is sent by a server to a client in order to make
+        * it release a lock, on a communication channel that uses a reverse
+        * context. It cannot be refreshed on its own, as it is the 'reverse'
+        * (server-side) representation of a client context.
+        * We do not care if the reverse context is expired, and want to send
+        * the LDLM callback anyway. Once the client receives the AST, it is
+        * its job to refresh its own context if it has expired, hence
+        * refreshing the associated reverse context on server side, before
+        * being able to send the LDLM_CANCEL requested by the server.
+        */
+       if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK)
+               rc = sptlrpc_req_refresh_ctx(req, 0);
        if (rc) {
                if (req->rq_err) {
                        req->rq_status = rc;
@@ -2143,7 +2157,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         * was good after getting the REPLY for her GET or
                         * the ACK for her PUT.
                         */
-                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed %d/%d/%d",
+                                 req->rq_status,
+                                 req->rq_bulk->bd_nob,
+                                 req->rq_bulk->bd_nob_transferred);
                        req->rq_status = -EIO;
                }
 
@@ -3407,12 +3424,11 @@ __u64 ptlrpc_next_xid(void)
  * request to ensure previous bulk fails and avoid problems with lost replies
  * and therefore several transfers landing into the same buffer from different
  * sending attempts.
+ * Also, to avoid previous reply landing to a different sending attempt.
  */
-void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+void ptlrpc_set_mbits(struct ptlrpc_request *req)
 {
-       struct ptlrpc_bulk_desc *bd = req->rq_bulk;
-
-       LASSERT(bd != NULL);
+       int md_count = req->rq_bulk ? req->rq_bulk->bd_md_count : 1;
 
        /*
         * Generate new matchbits for all resend requests, including
@@ -3428,7 +3444,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                 * 'resend for the -EINPROGRESS resend'. To make it simple,
                 * we opt to generate mbits for all resend cases.
                 */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data,
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
                                 BULK_MBITS)) {
                        req->rq_mbits = ptlrpc_next_xid();
                } else {
@@ -3442,15 +3458,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                        spin_unlock(&req->rq_import->imp_lock);
                        req->rq_mbits = req->rq_xid;
                }
-               CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
+               CDEBUG(D_HA, "resend with new mbits old x%llu new x%llu\n",
                       old_mbits, req->rq_mbits);
        } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
                /* Request being sent first time, use xid as matchbits. */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)
-                   || req->rq_mbits == 0) {
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
+                                BULK_MBITS) || req->rq_mbits == 0)
+               {
                        req->rq_mbits = req->rq_xid;
                } else {
-                       req->rq_mbits -= bd->bd_md_count - 1;
+                       req->rq_mbits -= md_count - 1;
                }
        } else {
                /*
@@ -3465,7 +3482,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * that server can infer the number of bulks that were prepared,
         * see LU-1431
         */
-       req->rq_mbits += bd->bd_md_count - 1;
+       req->rq_mbits += md_count - 1;
 
        /*
         * Set rq_xid as rq_mbits to indicate the final bulk for the old
@@ -3474,7 +3491,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * It's ok to directly set the rq_xid here, since this xid bump
         * won't affect the request position in unreplied list.
         */
-       if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
+       if (!OCD_HAS_FLAG(&req->rq_import->imp_connect_data, BULK_MBITS))
                req->rq_xid = req->rq_mbits;
 }