Whamcloud - gitweb
LU-8193 ptlrpc: set proper mbits for EINPROGRESS resend
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
index 21c1301..e7a694a 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -154,7 +154,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        struct obd_export        *exp = desc->bd_export;
        struct ptlrpc_connection *conn = exp->exp_connection;
        int                       rc = 0;
-       __u64                     xid;
+       __u64                     mbits;
        int                       posted_md;
        int                       total_md;
        lnet_md_t                 md;
@@ -173,11 +173,11 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        /* NB total length may be 0 for a read past EOF, so we send 0
         * length bulks, since the client expects bulk events.
         *
-        * The client may not need all of the bulk XIDs for the RPC.  The RPC
-        * used the XID of the highest bulk XID needed, and the server masks
+        * The client may not need all of the bulk mbits for the RPC. The RPC
+        * used the mbits of the highest bulk mbits needed, and the server masks
         * off high bits to get bulk count for this RPC. LU-1431 */
-       xid = desc->bd_req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
-       total_md = desc->bd_req->rq_xid - xid + 1;
+       mbits = desc->bd_req->rq_mbits & ~((__u64)desc->bd_md_max_brw - 1);
+       total_md = desc->bd_req->rq_mbits - mbits + 1;
 
        desc->bd_md_count = total_md;
        desc->bd_failure = 0;
@@ -186,7 +186,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        md.eq_handle = ptlrpc_eq_h;
        md.threshold = 2; /* SENT and ACK/REPLY */
 
-       for (posted_md = 0; posted_md < total_md; xid++) {
+       for (posted_md = 0; posted_md < total_md; mbits++) {
                md.options = PTLRPC_MD_OPTIONS;
 
                /* NB it's assumed that source and sink buffer frags are
@@ -217,17 +217,17 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                if (ptlrpc_is_bulk_put_source(desc->bd_type))
                        rc = LNetPut(conn->c_self, desc->bd_mds[posted_md],
                                     LNET_ACK_REQ, conn->c_peer,
-                                    desc->bd_portal, xid, 0, 0);
+                                    desc->bd_portal, mbits, 0, 0);
                else
                        rc = LNetGet(conn->c_self, desc->bd_mds[posted_md],
-                                    conn->c_peer, desc->bd_portal, xid, 0);
+                                    conn->c_peer, desc->bd_portal, mbits, 0);
 
                posted_md++;
                if (rc != 0) {
                        CERROR("%s: failed bulk transfer with %s:%u x"LPU64": "
                               "rc = %d\n", exp->exp_obd->obd_name,
                               libcfs_id2str(conn->c_peer), desc->bd_portal,
-                              xid, rc);
+                              mbits, rc);
                        break;
                }
        }
@@ -246,9 +246,9 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        }
 
        CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
-              "id %s xid "LPX64"-"LPX64"\n", desc->bd_iov_count,
+              "id %s mbits "LPX64"-"LPX64"\n", desc->bd_iov_count,
               desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer),
-              xid - posted_md, xid - 1);
+              mbits - posted_md, mbits - 1);
 
        RETURN(0);
 }
@@ -306,7 +306,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        int rc2;
        int posted_md;
        int total_md;
-       __u64 xid;
+       __u64 mbits;
        lnet_handle_me_t  me_h;
        lnet_md_t         md;
        ENTRY;
@@ -335,39 +335,37 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
        LASSERT(desc->bd_cbid.cbid_arg == desc);
 
-       /* An XID is only used for a single request from the client.
-        * For retried bulk transfers, a new XID will be allocated in
-        * in ptlrpc_check_set() if it needs to be resent, so it is not
-        * using the same RDMA match bits after an error.
-        *
-        * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The
-        * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */
-       xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
+       total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
+       /* rq_mbits is matchbits of the final bulk */
+       mbits = req->rq_mbits - total_md + 1;
+
+       LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK),
+                "first mbits = x"LPU64", last mbits = x"LPU64"\n",
+                mbits, req->rq_mbits);
        LASSERTF(!(desc->bd_registered &&
                   req->rq_send_state != LUSTRE_IMP_REPLAY) ||
-                xid != desc->bd_last_xid,
-                "registered: %d  rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
-                desc->bd_registered, xid, desc->bd_last_xid);
+                mbits != desc->bd_last_mbits,
+                "registered: %d  rq_mbits: "LPU64" bd_last_mbits: "LPU64"\n",
+                desc->bd_registered, mbits, desc->bd_last_mbits);
 
-       total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
        desc->bd_registered = 1;
-       desc->bd_last_xid = xid;
+       desc->bd_last_mbits = mbits;
        desc->bd_md_count = total_md;
        md.user_ptr = &desc->bd_cbid;
        md.eq_handle = ptlrpc_eq_h;
        md.threshold = 1;                       /* PUT or GET */
 
-       for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
+       for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
                md.options = PTLRPC_MD_OPTIONS |
                             (ptlrpc_is_bulk_op_get(desc->bd_type) ?
                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 
-               rc = LNetMEAttach(desc->bd_portal, peer, xid, 0,
+               rc = LNetMEAttach(desc->bd_portal, peer, mbits, 0,
                                  LNET_UNLINK, LNET_INS_AFTER, &me_h);
                if (rc != 0) {
                        CERROR("%s: LNetMEAttach failed x"LPU64"/%d: rc = %d\n",
-                              desc->bd_import->imp_obd->obd_name, xid,
+                              desc->bd_import->imp_obd->obd_name, mbits,
                               posted_md, rc);
                        break;
                }
@@ -377,7 +375,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
                                  &desc->bd_mds[posted_md]);
                if (rc != 0) {
                        CERROR("%s: LNetMDAttach failed x"LPU64"/%d: rc = %d\n",
-                              desc->bd_import->imp_obd->obd_name, xid,
+                              desc->bd_import->imp_obd->obd_name, mbits,
                               posted_md, rc);
                        rc2 = LNetMEUnlink(me_h);
                        LASSERT(rc2 == 0);
@@ -396,15 +394,8 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
                RETURN(-ENOMEM);
        }
 
-       /* Set rq_xid to matchbits of the final bulk so that server can
-        * infer the number of bulks that were prepared */
-       req->rq_xid = --xid;
-       LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK),
-                "bd_last_xid = x"LPU64", rq_xid = x"LPU64"\n",
-                desc->bd_last_xid, req->rq_xid);
-
        spin_lock(&desc->bd_lock);
-       /* Holler if peer manages to touch buffers before he knows the xid */
+       /* Holler if peer manages to touch buffers before he knows the mbits */
        if (desc->bd_md_count != total_md)
                CWARN("%s: Peer %s touched %d buffers while I registered\n",
                      desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
@@ -412,10 +403,10 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        spin_unlock(&desc->bd_lock);
 
        CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
-              "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
+              "mbits x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
               ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
               desc->bd_iov_count, desc->bd_nob,
-              desc->bd_last_xid, req->rq_xid, desc->bd_portal);
+              desc->bd_last_mbits, req->rq_mbits, desc->bd_portal);
 
        RETURN(0);
 }
@@ -711,6 +702,38 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
        lustre_msghdr_set_flags(request->rq_reqmsg,
                                imp->imp_msghdr_flags);
 
+       /* If it's the first time to resend the request for EINPROGRESS,
+        * we need to allocate a new XID (see after_reply()), it's different
+        * from the resend for reply timeout. */
+       if (request->rq_nr_resend != 0 &&
+           list_empty(&request->rq_unreplied_list)) {
+               __u64 min_xid = 0;
+               /* resend for EINPROGRESS, allocate new xid to avoid reply
+                * reconstruction */
+               spin_lock(&imp->imp_lock);
+               ptlrpc_assign_next_xid_nolock(request);
+               min_xid = ptlrpc_known_replied_xid(imp);
+               spin_unlock(&imp->imp_lock);
+
+               lustre_msg_set_last_xid(request->rq_reqmsg, min_xid);
+               DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for "
+                         "resend on EINPROGRESS");
+       }
+
+       if (request->rq_bulk != NULL) {
+               ptlrpc_set_bulk_mbits(request);
+               lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
+       }
+
+       if (list_empty(&request->rq_unreplied_list) ||
+           request->rq_xid <= imp->imp_known_replied_xid) {
+               DEBUG_REQ(D_ERROR, request, "xid: "LPU64", replied: "LPU64", "
+                         "list_empty:%d\n", request->rq_xid,
+                         imp->imp_known_replied_xid,
+                         list_empty(&request->rq_unreplied_list));
+               LBUG();
+       }
+
        /** For enabled AT all request should have AT_SUPPORT in the
         * FULL import state when OBD_CONNECT_AT is set */
        LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
@@ -726,9 +749,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         if (request->rq_memalloc)
                 mpflag = cfs_memory_pressure_get_and_set();
 
-        rc = sptlrpc_cli_wrap_request(request);
-        if (rc)
-                GOTO(out, rc);
+       rc = sptlrpc_cli_wrap_request(request);
+       if (rc == -ENOMEM)
+               /* set rq_sent so that this request is treated
+                * as a delayed send in the upper layers */
+               request->rq_sent = cfs_time_current_sec();
+       if (rc)
+               GOTO(out, rc);
 
         /* bulk register should be done after wrap_request() */
         if (request->rq_bulk != NULL) {