Whamcloud - gitweb
LU-12567 ptlrpc: handle reply and resend reorder
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 22c831c..04fe734 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 /** Implementation of client-side PortalRPC interfaces */
@@ -406,10 +405,10 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
                 * resent time, but server sent back service time of original
                 * RPC.
                 */
-               CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
-                      D_ADAPTTO : D_WARNING,
-                      "Reported service time %u > total measured time %lld\n",
-                      service_timeout, now - req->rq_sent);
+               CDEBUG_LIMIT((lustre_msg_get_flags(req->rq_reqmsg) &
+                             MSG_RESENT) ?  D_ADAPTTO : D_WARNING,
+                            "Reported service time %u > total measured time %lld\n",
+                            service_timeout, now - req->rq_sent);
                return;
        }
 
@@ -551,14 +550,14 @@ void ptlrpc_request_cache_free(struct ptlrpc_request *req)
  */
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
 {
-       struct list_head *l, *tmp;
        struct ptlrpc_request *req;
 
        LASSERT(pool != NULL);
 
        spin_lock(&pool->prp_lock);
-       list_for_each_safe(l, tmp, &pool->prp_req_list) {
-               req = list_entry(l, struct ptlrpc_request, rq_list);
+       while ((req = list_first_entry_or_null(&pool->prp_req_list,
+                                              struct ptlrpc_request,
+                                              rq_list))) {
                list_del(&req->rq_list);
                LASSERT(req->rq_reqbuf);
                LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
@@ -668,8 +667,8 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
                return NULL;
        }
 
-       request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
-                            rq_list);
+       request = list_first_entry(&pool->prp_req_list, struct ptlrpc_request,
+                                  rq_list);
        list_del_init(&request->rq_list);
        spin_unlock(&pool->prp_lock);
 
@@ -702,17 +701,14 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
 void ptlrpc_add_unreplied(struct ptlrpc_request *req)
 {
        struct obd_import *imp = req->rq_import;
-       struct list_head *tmp;
        struct ptlrpc_request *iter;
 
        assert_spin_locked(&imp->imp_lock);
        LASSERT(list_empty(&req->rq_unreplied_list));
 
        /* unreplied list is sorted by xid in ascending order */
-       list_for_each_prev(tmp, &imp->imp_unreplied_list) {
-               iter = list_entry(tmp, struct ptlrpc_request,
-                                 rq_unreplied_list);
-
+       list_for_each_entry_reverse(iter, &imp->imp_unreplied_list,
+                                   rq_unreplied_list) {
                LASSERT(req->rq_xid != iter->rq_xid);
                if (req->rq_xid < iter->rq_xid)
                        continue;
@@ -1106,8 +1102,7 @@ struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
  */
 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
-       struct list_head *next;
+       struct ptlrpc_request *req;
        int expected_phase;
        int n = 0;
 
@@ -1116,11 +1111,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
        /* Requests on the set should either all be completed, or all be new */
        expected_phase = (atomic_read(&set->set_remaining) == 0) ?
                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                LASSERT(req->rq_phase == expected_phase);
                n++;
        }
@@ -1129,10 +1120,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 atomic_read(&set->set_remaining) == n, "%d / %d\n",
                 atomic_read(&set->set_remaining), n);
 
-       list_for_each_safe(tmp, next, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
+       while ((req = list_first_entry_or_null(&set->set_requests,
+                                              struct ptlrpc_request,
+                                              rq_set_chain))) {
                list_del_init(&req->rq_set_chain);
 
                LASSERT(req->rq_phase == expected_phase);
@@ -1409,8 +1399,8 @@ __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
        if (list_empty(&imp->imp_unreplied_list))
                return 0;
 
-       req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
-                        rq_unreplied_list);
+       req = list_first_entry(&imp->imp_unreplied_list, struct ptlrpc_request,
+                              rq_unreplied_list);
        LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
 
        if (imp->imp_known_replied_xid < req->rq_xid - 1)
@@ -1706,7 +1696,22 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        lustre_msg_set_status(req->rq_reqmsg, current->pid);
 
-       rc = sptlrpc_req_refresh_ctx(req, 0);
+       /* If the request to be sent is an LDLM callback, do not try to
+        * refresh context.
+        * An LDLM callback is sent by a server to a client in order to make
+        * it release a lock, on a communication channel that uses a reverse
+        * context. It cannot be refreshed on its own, as it is the 'reverse'
+        * (server-side) representation of a client context.
+        * We do not care if the reverse context is expired, and want to send
+        * the LDLM callback anyway. Once the client receives the AST, it is
+        * its job to refresh its own context if it has expired, hence
+        * refreshing the associated reverse context on server side, before
+        * being able to send the LDLM_CANCEL requested by the server.
+        */
+       if (lustre_msg_get_opc(req->rq_reqmsg) != LDLM_BL_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_CP_CALLBACK &&
+           lustre_msg_get_opc(req->rq_reqmsg) != LDLM_GL_CALLBACK)
+               rc = sptlrpc_req_refresh_ctx(req, 0);
        if (rc) {
                if (req->rq_err) {
                        req->rq_status = rc;
@@ -1786,7 +1791,7 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
  */
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp, *next;
+       struct ptlrpc_request *req, *next;
        LIST_HEAD(comp_reqs);
        int force_timer_recalc = 0;
 
@@ -1794,10 +1799,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
        if (atomic_read(&set->set_remaining) == 0)
                RETURN(1);
 
-       list_for_each_safe(tmp, next, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
+       list_for_each_entry_safe(req, next, &set->set_requests,
+                                rq_set_chain) {
                struct obd_import *imp = req->rq_import;
                int unregistered = 0;
                int async = 1;
@@ -2154,7 +2157,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         * was good after getting the REPLY for her GET or
                         * the ACK for her PUT.
                         */
-                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed %d/%d/%d",
+                                 req->rq_status,
+                                 req->rq_bulk->bd_nob,
+                                 req->rq_bulk->bd_nob_transferred);
                        req->rq_status = -EIO;
                }
 
@@ -2338,7 +2344,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
  */
 void ptlrpc_expired_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *req;
        time64_t now = ktime_get_real_seconds();
 
        ENTRY;
@@ -2347,11 +2353,7 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set)
        /*
         * A timeout expired. See which reqs it applies to...
         */
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                /* don't expire request waiting for context */
                if (req->rq_wait_ctx)
                        continue;
@@ -2387,15 +2389,12 @@ void ptlrpc_expired_set(struct ptlrpc_request_set *set)
  */
 static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *req;
 
        LASSERT(set != NULL);
        CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
 
-       list_for_each(tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                if (req->rq_intr)
                        continue;
 
@@ -2415,16 +2414,13 @@ static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
  */
 time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
        time64_t now = ktime_get_real_seconds();
        int timeout = 0;
        struct ptlrpc_request *req;
        time64_t deadline;
 
        ENTRY;
-       list_for_each(tmp, &set->set_requests) {
-               req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                /* Request in-flight? */
                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
                      (req->rq_phase == RQ_PHASE_BULK) ||
@@ -2462,7 +2458,6 @@ time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
  */
 int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp;
        struct ptlrpc_request *req;
        time64_t timeout;
        int rc;
@@ -2471,9 +2466,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
        if (set->set_producer)
                (void)ptlrpc_set_producer(set);
        else
-               list_for_each(tmp, &set->set_requests) {
-                       req = list_entry(tmp, struct ptlrpc_request,
-                                        rq_set_chain);
+               list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                        if (req->rq_phase == RQ_PHASE_NEW)
                                (void)ptlrpc_send_new_req(req);
                }
@@ -2567,9 +2560,8 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                 * the error cases -eeb.
                 */
                if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
-                       list_for_each(tmp, &set->set_requests) {
-                               req = list_entry(tmp, struct ptlrpc_request,
-                                                rq_set_chain);
+                       list_for_each_entry(req, &set->set_requests,
+                                           rq_set_chain) {
                                spin_lock(&req->rq_lock);
                                req->rq_invalid_rqset = 1;
                                spin_unlock(&req->rq_lock);
@@ -2580,9 +2572,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
        LASSERT(atomic_read(&set->set_remaining) == 0);
 
        rc = set->set_rc; /* rq_status of already freed requests if any */
-       list_for_each(tmp, &set->set_requests) {
-               req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
                if (req->rq_status != 0)
                        rc = req->rq_status;
@@ -3026,7 +3016,7 @@ EXPORT_SYMBOL(ptlrpc_request_addref);
 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                      struct obd_import *imp)
 {
-       struct list_head *tmp;
+       struct ptlrpc_request *iter;
 
        assert_spin_locked(&imp->imp_lock);
 
@@ -3054,11 +3044,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
        LASSERT(imp->imp_replayable);
        /* Balanced in ptlrpc_free_committed, usually. */
        ptlrpc_request_addref(req);
-       list_for_each_prev(tmp, &imp->imp_replay_list) {
-               struct ptlrpc_request *iter = list_entry(tmp,
-                                                        struct ptlrpc_request,
-                                                        rq_replay_list);
-
+       list_for_each_entry_reverse(iter, &imp->imp_replay_list,
+                                   rq_replay_list) {
                /*
                 * We may have duplicate transnos if we create and then
                 * open a file, or for closes retained if to match creating
@@ -3306,7 +3293,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
  */
 void ptlrpc_abort_inflight(struct obd_import *imp)
 {
-       struct list_head *tmp, *n;
+       struct ptlrpc_request *req;
        ENTRY;
 
        /*
@@ -3321,11 +3308,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
         * locked?  Also, how do we know if the requests on the list are
         * being freed at this time?
         */
-       list_for_each_safe(tmp, n, &imp->imp_sending_list) {
-               struct ptlrpc_request *req = list_entry(tmp,
-                                                       struct ptlrpc_request,
-                                                       rq_list);
-
+       list_for_each_entry(req, &imp->imp_sending_list, rq_list) {
                DEBUG_REQ(D_RPCTRACE, req, "inflight");
 
                spin_lock(&req->rq_lock);
@@ -3337,10 +3320,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                spin_unlock(&req->rq_lock);
        }
 
-       list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
-               struct ptlrpc_request *req =
-                       list_entry(tmp, struct ptlrpc_request, rq_list);
-
+       list_for_each_entry(req, &imp->imp_delayed_list, rq_list) {
                DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
 
                spin_lock(&req->rq_lock);
@@ -3367,15 +3347,11 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
  */
 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 {
-       struct list_head *tmp, *pos;
+       struct ptlrpc_request *req;
 
        LASSERT(set != NULL);
 
-       list_for_each_safe(pos, tmp, &set->set_requests) {
-               struct ptlrpc_request *req =
-                       list_entry(pos, struct ptlrpc_request,
-                                  rq_set_chain);
-
+       list_for_each_entry(req, &set->set_requests, rq_set_chain) {
                spin_lock(&req->rq_lock);
                if (req->rq_phase != RQ_PHASE_RPC) {
                        spin_unlock(&req->rq_lock);
@@ -3448,12 +3424,11 @@ __u64 ptlrpc_next_xid(void)
  * request to ensure previous bulk fails and avoid problems with lost replies
  * and therefore several transfers landing into the same buffer from different
  * sending attempts.
+ * Also, to avoid previous reply landing to a different sending attempt.
  */
-void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+void ptlrpc_set_mbits(struct ptlrpc_request *req)
 {
-       struct ptlrpc_bulk_desc *bd = req->rq_bulk;
-
-       LASSERT(bd != NULL);
+       int md_count = req->rq_bulk ? req->rq_bulk->bd_md_count : 1;
 
        /*
         * Generate new matchbits for all resend requests, including
@@ -3469,7 +3444,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                 * 'resend for the -EINPROGRESS resend'. To make it simple,
                 * we opt to generate mbits for all resend cases.
                 */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data,
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
                                 BULK_MBITS)) {
                        req->rq_mbits = ptlrpc_next_xid();
                } else {
@@ -3483,15 +3458,16 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                        spin_unlock(&req->rq_import->imp_lock);
                        req->rq_mbits = req->rq_xid;
                }
-               CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
+               CDEBUG(D_HA, "resend with new mbits old x%llu new x%llu\n",
                       old_mbits, req->rq_mbits);
        } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
                /* Request being sent first time, use xid as matchbits. */
-               if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)
-                   || req->rq_mbits == 0) {
+               if (OCD_HAS_FLAG(&req->rq_import->imp_connect_data,
+                                BULK_MBITS) || req->rq_mbits == 0)
+               {
                        req->rq_mbits = req->rq_xid;
                } else {
-                       req->rq_mbits -= bd->bd_md_count - 1;
+                       req->rq_mbits -= md_count - 1;
                }
        } else {
                /*
@@ -3506,7 +3482,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * that server can infer the number of bulks that were prepared,
         * see LU-1431
         */
-       req->rq_mbits += bd->bd_md_count - 1;
+       req->rq_mbits += md_count - 1;
 
        /*
         * Set rq_xid as rq_mbits to indicate the final bulk for the old
@@ -3515,7 +3491,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * It's ok to directly set the rq_xid here, since this xid bump
         * won't affect the request position in unreplied list.
         */
-       if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
+       if (!OCD_HAS_FLAG(&req->rq_import->imp_connect_data, BULK_MBITS))
                req->rq_xid = req->rq_mbits;
 }