Whamcloud - gitweb
LU-13984 ptlrpc: throttle RPC resend if network error
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 5602b58..368732a 100644 (file)
@@ -66,7 +66,7 @@ static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
        int i;
 
        for (i = 0; i < desc->bd_iov_count ; i++)
-               put_page(desc->bd_vec[i].kiov_page);
+               put_page(desc->bd_vec[i].bv_page);
 }
 
 static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
@@ -169,6 +169,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
 
        LASSERT(ops->add_kiov_frag != NULL);
 
+       if (max_brw > PTLRPC_BULK_OPS_COUNT)
+               RETURN(NULL);
+
+       if (nfrags > LNET_MAX_IOV * max_brw)
+               RETURN(NULL);
+
        OBD_ALLOC_PTR(desc);
        if (!desc)
                return NULL;
@@ -185,6 +191,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
+       desc->bd_nob_last = LNET_MTU;
        desc->bd_frag_ops = ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
@@ -243,7 +250,7 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
                             struct page *page, int pageoffset, int len,
                             int pin)
 {
-       lnet_kiov_t *kiov;
+       struct bio_vec *kiov;
 
        LASSERT(desc->bd_iov_count < desc->bd_max_iov);
        LASSERT(page != NULL);
@@ -253,14 +260,23 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 
        kiov = &desc->bd_vec[desc->bd_iov_count];
 
+       if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+            ((desc->bd_nob_last + len) > LNET_MTU)) {
+               desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+               desc->bd_md_count++;
+               desc->bd_nob_last = 0;
+               LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+       }
+
+       desc->bd_nob_last += len;
        desc->bd_nob += len;
 
        if (pin)
                get_page(page);
 
-       kiov->kiov_page = page;
-       kiov->kiov_offset = pageoffset;
-       kiov->kiov_len = len;
+       kiov->bv_page = page;
+       kiov->bv_offset = pageoffset;
+       kiov->bv_len = len;
 
        desc->bd_iov_count++;
 }
@@ -272,7 +288,7 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 
        LASSERT(desc != NULL);
        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
-       LASSERT(desc->bd_md_count == 0);         /* network hands off */
+       LASSERT(desc->bd_refs == 0);         /* network hands off */
        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
        LASSERT(desc->bd_frag_ops != NULL);
 
@@ -299,10 +315,6 @@ EXPORT_SYMBOL(ptlrpc_free_bulk);
  */
 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
 {
-       __u32 serv_est;
-       int idx;
-       struct imp_at *at;
-
        LASSERT(req->rq_import);
 
        if (AT_OFF) {
@@ -317,18 +329,25 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
                req->rq_timeout = req->rq_import->imp_server_timeout ?
                                  obd_timeout / 2 : obd_timeout;
        } else {
-               at = &req->rq_import->imp_at;
+               struct imp_at *at = &req->rq_import->imp_at;
+               timeout_t serv_est;
+               int idx;
+
                idx = import_at_get_index(req->rq_import,
                                          req->rq_request_portal);
                serv_est = at_get(&at->iat_service_estimate[idx]);
+               /*
+                * Currently a 32 bit value is sent over the
+                * wire for rq_timeout so please don't change this
+                * to time64_t. The work for LU-1158 will in time
+                * replace rq_timeout with a 64 bit nanosecond value
+                */
                req->rq_timeout = at_est2timeout(serv_est);
        }
        /*
         * We could get even fancier here, using history to predict increased
         * loading...
-        */
-
-       /*
+        *
         * Let the server know what this RPC timeout is by putting it in the
         * reqmsg
         */
@@ -338,10 +357,10 @@ EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 
 /* Adjust max service estimate based on server value */
 static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
-                                 unsigned int serv_est)
+                                 timeout_t serv_est)
 {
        int idx;
-       unsigned int oldse;
+       timeout_t oldse;
        struct imp_at *at;
 
        LASSERT(req->rq_import);
@@ -369,15 +388,16 @@ int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
 
 /* Adjust expected network latency */
 void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
-                              unsigned int service_time)
+                              timeout_t service_timeout)
 {
-       unsigned int nl, oldnl;
-       struct imp_at *at;
        time64_t now = ktime_get_real_seconds();
+       struct imp_at *at;
+       timeout_t oldnl;
+       timeout_t nl;
 
        LASSERT(req->rq_import);
 
-       if (service_time > now - req->rq_sent + 3) {
+       if (service_timeout > now - req->rq_sent + 3) {
                /*
                 * b=16408, however, this can also happen if early reply
                 * is lost and client RPC is expired and resent, early reply
@@ -389,13 +409,14 @@ void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
                CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
                       D_ADAPTTO : D_WARNING,
                       "Reported service time %u > total measured time %lld\n",
-                      service_time, now - req->rq_sent);
+                      service_timeout, now - req->rq_sent);
                return;
        }
 
-       /* Network latency is total time less server processing time */
-       nl = max_t(int, now - req->rq_sent -
-                       service_time, 0) + 1; /* st rounding */
+       /* Network latency is total time less server processing time,
+        * st rounding
+        */
+       nl = max_t(timeout_t, now - req->rq_sent - service_timeout, 0) + 1;
        at = &req->rq_import->imp_at;
 
        oldnl = at_measured(&at->iat_net_latency, nl);
@@ -437,6 +458,7 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
 __must_hold(&req->rq_lock)
 {
        struct ptlrpc_request *early_req;
+       timeout_t service_timeout;
        time64_t olddl;
        int rc;
 
@@ -468,8 +490,8 @@ __must_hold(&req->rq_lock)
        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
 
        /* Network latency can be adjusted, it is pure network delays */
-       ptlrpc_at_adj_net_latency(req,
-                                 lustre_msg_get_service_time(early_req->rq_repmsg));
+       service_timeout = lustre_msg_get_service_timeout(early_req->rq_repmsg);
+       ptlrpc_at_adj_net_latency(req, service_timeout);
 
        sptlrpc_cli_finish_early_reply(early_req);
 
@@ -823,11 +845,12 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                }
 
                if (fail_t) {
-                       *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
+                       *fail_t = ktime_get_real_seconds() +
+                                 PTLRPC_REQ_LONG_UNLINK;
 
                        if (fail2_t)
                                *fail2_t = ktime_get_real_seconds() +
-                                          LONG_UNLINK;
+                                          PTLRPC_REQ_LONG_UNLINK;
 
                        /*
                         * The RPC is infected, let the test to change the
@@ -844,6 +867,7 @@ out_ctx:
        LASSERT(!request->rq_pool);
        sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
 out_free:
+       atomic_dec(&imp->imp_reqs);
        class_import_put(imp);
 
        return rc;
@@ -881,13 +905,14 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
        if (request) {
                ptlrpc_cli_req_init(request);
 
-               LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
+               LASSERTF((unsigned long)imp > 0x1000, "%p\n", imp);
                LASSERT(imp != LP_POISON);
                LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
                         imp->imp_client);
                LASSERT(imp->imp_client != LP_POISON);
 
                request->rq_import = class_import_get(imp);
+               atomic_inc(&imp->imp_reqs);
        } else {
                CERROR("request allocation out of memory\n");
        }
@@ -895,6 +920,33 @@ struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
        return request;
 }
 
+static int ptlrpc_reconnect_if_idle(struct obd_import *imp)
+{
+       int rc;
+
+       /*
+        * initiate connection if needed when the import has been
+        * referenced by the new request to avoid races with disconnect.
+        * serialize this check against conditional state=IDLE
+        * in ptlrpc_disconnect_idle_interpret()
+        */
+       spin_lock(&imp->imp_lock);
+       if (imp->imp_state == LUSTRE_IMP_IDLE) {
+               imp->imp_generation++;
+               imp->imp_initiated_at = imp->imp_generation;
+               imp->imp_state = LUSTRE_IMP_NEW;
+
+               /* connect_import_locked releases imp_lock */
+               rc = ptlrpc_connect_import_locked(imp);
+               if (rc)
+                       return rc;
+               ptlrpc_pinger_add_import(imp);
+       } else {
+               spin_unlock(&imp->imp_lock);
+       }
+       return 0;
+}
+
 /**
  * Helper function for creating a request.
  * Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
@@ -912,32 +964,13 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
        if (!request)
                return NULL;
 
-       /*
-        * initiate connection if needed when the import has been
-        * referenced by the new request to avoid races with disconnect
-        */
-       if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
-               int rc;
-
-               CDEBUG_LIMIT(imp->imp_idle_debug,
-                            "%s: reconnect after %llds idle\n",
-                            imp->imp_obd->obd_name, ktime_get_real_seconds() -
-                                                    imp->imp_last_reply_time);
-               spin_lock(&imp->imp_lock);
-               if (imp->imp_state == LUSTRE_IMP_IDLE) {
-                       imp->imp_generation++;
-                       imp->imp_initiated_at = imp->imp_generation;
-                       imp->imp_state = LUSTRE_IMP_NEW;
-
-                       /* connect_import_locked releases imp_lock */
-                       rc = ptlrpc_connect_import_locked(imp);
-                       if (rc < 0) {
-                               ptlrpc_request_free(request);
-                               return NULL;
-                       }
-                       ptlrpc_pinger_add_import(imp);
-               } else {
-                       spin_unlock(&imp->imp_lock);
+       /* don't make expensive check for idling connection
+        * if it's already connected */
+       if (unlikely(imp->imp_state != LUSTRE_IMP_FULL)) {
+               if (ptlrpc_reconnect_if_idle(imp) < 0) {
+                       atomic_dec(&imp->imp_reqs);
+                       ptlrpc_request_free(request);
+                       return NULL;
                }
        }
 
@@ -1503,7 +1536,7 @@ static int after_reply(struct ptlrpc_request *req)
                CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
        ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
        ptlrpc_at_adj_net_latency(req,
-                                 lustre_msg_get_service_time(req->rq_repmsg));
+                                 lustre_msg_get_service_timeout(req->rq_repmsg));
 
        rc = ptlrpc_check_status(req);
 
@@ -1968,6 +2001,27 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                        GOTO(interpret, req->rq_status);
                                }
 
+                               /* don't resend too fast in case of network
+                                * errors.
+                                */
+                               if (ktime_get_real_seconds() < (req->rq_sent + 1)
+                                   && req->rq_net_err && req->rq_timedout) {
+
+                                       DEBUG_REQ(D_INFO, req,
+                                                 "throttle request");
+                                       /* Don't try to resend RPC right away
+                                        * as it is likely it will fail again
+                                        * and ptlrpc_check_set() will be
+                                        * called again, keeping this thread
+                                        * busy. Instead, wait for the next
+                                        * timeout. Flag it as resend to
+                                        * ensure we don't wait to long.
+                                        */
+                                       req->rq_resend = 1;
+                                       spin_unlock(&imp->imp_lock);
+                                       continue;
+                               }
+
                                list_move_tail(&req->rq_list,
                                               &imp->imp_sending_list);
 
@@ -2225,7 +2279,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
                       req->rq_real_sent < req->rq_sent ||
                       req->rq_real_sent >= req->rq_deadline) ?
                      "timed out for sent delay" : "timed out for slow reply"),
-                 (s64)req->rq_sent, (s64)req->rq_real_sent);
+                 req->rq_sent, req->rq_real_sent);
 
        if (imp && obd_debug_peer_on_timeout)
                LNetDebugPeer(imp->imp_connection->c_peer);
@@ -2477,10 +2531,10 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                         */
                        if (rc == -ETIMEDOUT &&
                            signal_pending(current)) {
-                               sigset_t blocked_sigs;
+                               sigset_t old, new;
 
-                               cfs_block_sigsinv(LUSTRE_FATAL_SIGS,
-                                                 &blocked_sigs);
+                               siginitset(&new, LUSTRE_FATAL_SIGS);
+                               sigprocmask(SIG_BLOCK, &new, &old);
                                /*
                                 * In fact we only interrupt for the
                                 * "fatal" signals like SIGINT or
@@ -2491,7 +2545,7 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                                 */
                                if (signal_pending(current))
                                        ptlrpc_interrupted_set(set);
-                               cfs_restore_sigs(&blocked_sigs);
+                               sigprocmask(SIG_SETMASK, &old, NULL);
                        }
                }
 
@@ -2580,6 +2634,10 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                sptlrpc_cli_free_repbuf(request);
 
        if (request->rq_import) {
+               if (!ptlrpcd_check_work(request)) {
+                       LASSERT(atomic_read(&request->rq_import->imp_reqs) > 0);
+                       atomic_dec(&request->rq_import->imp_reqs);
+               }
                class_import_put(request->rq_import);
                request->rq_import = NULL;
        }
@@ -2703,7 +2761,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
            async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
                request->rq_reply_deadline = ktime_get_real_seconds() +
-                                            LONG_UNLINK;
+                                            PTLRPC_REQ_LONG_UNLINK;
 
        /*
         * Nothing left to do.
@@ -2737,7 +2795,7 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
                wait_queue_head_t *wq = (request->rq_set) ?
                                        &request->rq_set->set_waitq :
                                        &request->rq_reply_waitq;
-               int seconds = LONG_UNLINK;
+               int seconds = PTLRPC_REQ_LONG_UNLINK;
                /*
                 * Network access will complete in finite time but the HUGE
                 * timeout lets us CWARN for visibility of sluggish NALs
@@ -3219,8 +3277,8 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
        ptlrpc_at_set_req_timeout(req);
 
        /* Tell server net_latency to calculate how long to wait for reply. */
-       lustre_msg_set_service_time(req->rq_reqmsg,
-                                   ptlrpc_at_get_net_latency(req));
+       lustre_msg_set_service_timeout(req->rq_reqmsg,
+                                      ptlrpc_at_get_net_latency(req));
        DEBUG_REQ(D_HA, req, "REPLAY");
 
        atomic_inc(&req->rq_import->imp_replay_inflight);
@@ -3423,9 +3481,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                    || req->rq_mbits == 0) {
                        req->rq_mbits = req->rq_xid;
                } else {
-                       int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                                       LNET_MAX_IOV;
-                       req->rq_mbits -= total_md - 1;
+                       req->rq_mbits -= bd->bd_md_count - 1;
                }
        } else {
                /*
@@ -3440,8 +3496,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * that server can infer the number of bulks that were prepared,
         * see LU-1431
         */
-       req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                         LNET_MAX_IOV) - 1;
+       req->rq_mbits += bd->bd_md_count - 1;
 
        /*
         * Set rq_xid as rq_mbits to indicate the final bulk for the old