Whamcloud - gitweb
LU-9120 lnet: timeout delayed REPLYs and ACKs 71/32771/15
authorAmir Shehata <amir.shehata@intel.com>
Fri, 29 Jun 2018 01:02:42 +0000 (18:02 -0700)
committerAmir Shehata <ashehata@whamcloud.com>
Fri, 17 Aug 2018 20:12:23 +0000 (20:12 +0000)
When a GET or a PUT which require an ACK are sent, add a response
tracker block on a percpt queue. When the REPLY/ACK are received
then remove the block from the percpt queue. The monitor thread
will wake up periodically to check if any of the blocks have
expired and if so, it will send a timeout event to the ULP and
flag the MD as stale, then unlink.

Test-Parameters: forbuildonly
Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: Ia219fca5a578d625819b9f9c8ee2b3aa050dce80
Reviewed-on: https://review.whamcloud.com/32771
Tested-by: Jenkins
Reviewed-by: Olaf Weber <olaf.weber@hpe.com>
Reviewed-by: Sonia Sharma <sharmaso@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c

index 0af6d32..a6799fa 100644 (file)
@@ -490,6 +490,24 @@ lnet_msg_free(struct lnet_msg *msg)
        LIBCFS_FREE(msg, sizeof(*msg));
 }
 
+static inline struct lnet_rsp_tracker *
+lnet_rspt_alloc(int cpt)
+{
+       struct lnet_rsp_tracker *rspt;
+       LIBCFS_ALLOC(rspt, sizeof(*rspt));
+       lnet_net_lock(cpt);
+       lnet_net_unlock(cpt);
+       return rspt;
+}
+
+static inline void
+lnet_rspt_free(struct lnet_rsp_tracker *rspt, int cpt)
+{
+       LIBCFS_FREE(rspt, sizeof(*rspt));
+       lnet_net_lock(cpt);
+       lnet_net_unlock(cpt);
+}
+
 void lnet_ni_free(struct lnet_ni *ni);
 void lnet_net_free(struct lnet_net *net);
 
@@ -682,6 +700,7 @@ struct lnet_msg *lnet_create_reply_msg(struct lnet_ni *ni,
                                       struct lnet_msg *get_msg);
 void lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *msg,
                            unsigned int len);
+void lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt);
 
 void lnet_finalize(struct lnet_msg *msg, int rc);
 
index 3c36ebf..e921595 100644 (file)
@@ -78,6 +78,17 @@ enum lnet_msg_hstatus {
        LNET_MSG_STATUS_NETWORK_TIMEOUT
 };
 
+struct lnet_rsp_tracker {
+       /* chain on the waiting list */
+       struct list_head rspt_on_list;
+       /* cpt to lock */
+       int rspt_cpt;
+       /* deadline of the REPLY/ACK */
+       ktime_t rspt_deadline;
+       /* parent MD */
+       struct lnet_handle_md rspt_mdh;
+};
+
 struct lnet_msg {
        struct list_head        msg_activelist;
        struct list_head        msg_list;       /* Q for credits/MD */
@@ -191,24 +202,25 @@ struct lnet_me {
 };
 
 struct lnet_libmd {
-       struct list_head        md_list;
-       struct lnet_libhandle   md_lh;
-       struct lnet_me         *md_me;
-       char                   *md_start;
-       unsigned int            md_offset;
-       unsigned int            md_length;
-       unsigned int            md_max_size;
-       int                     md_threshold;
-       int                     md_refcount;
-       unsigned int            md_options;
-       unsigned int            md_flags;
-       unsigned int            md_niov;        /* # frags at end of struct */
-       void                   *md_user_ptr;
-       struct lnet_eq         *md_eq;
-       struct lnet_handle_md   md_bulk_handle;
+       struct list_head         md_list;
+       struct lnet_libhandle    md_lh;
+       struct lnet_me          *md_me;
+       char                    *md_start;
+       unsigned int             md_offset;
+       unsigned int             md_length;
+       unsigned int             md_max_size;
+       int                      md_threshold;
+       int                      md_refcount;
+       unsigned int             md_options;
+       unsigned int             md_flags;
+       unsigned int             md_niov;       /* # frags at end of struct */
+       void                    *md_user_ptr;
+       struct lnet_rsp_tracker *md_rspt_ptr;
+       struct lnet_eq          *md_eq;
+       struct lnet_handle_md    md_bulk_handle;
        union {
-               struct kvec     iov[LNET_MAX_IOV];
-               lnet_kiov_t     kiov[LNET_MAX_IOV];
+               struct kvec      iov[LNET_MAX_IOV];
+               lnet_kiov_t      kiov[LNET_MAX_IOV];
        } md_iov;
 };
 
@@ -1085,6 +1097,14 @@ struct lnet {
        struct list_head                ln_mt_localNIRecovq;
        /* local NIs to recover */
        struct list_head                ln_mt_peerNIRecovq;
+       /*
+        * An array of queues for GET/PUT waiting for REPLY/ACK respectively.
+        * There are CPT number of queues. Since response trackers will be
+        * added on the fast path we can't afford to grab the exclusive
+        * net lock to protect these queues. The CPT will be calculated
+        * based on the mdh cookie.
+        */
+       struct list_head                **ln_mt_rstq;
        /* recovery eq handler */
        struct lnet_handle_eq           ln_mt_eqh;
 
index 12ac396..b960864 100644 (file)
@@ -2653,6 +2653,110 @@ struct lnet_mt_event_info {
        lnet_nid_t mt_nid;
 };
 
+void
+lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
+{
+       struct lnet_rsp_tracker *rspt;
+
+       /*
+        * msg has a refcount on the MD so the MD is not going away.
+        * The rspt queue for the cpt is protected by
+        * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
+        */
+       lnet_res_lock(cpt);
+       if (!md->md_rspt_ptr) {
+               lnet_res_unlock(cpt);
+               return;
+       }
+       rspt = md->md_rspt_ptr;
+       md->md_rspt_ptr = NULL;
+
+       /* debug code */
+       LASSERT(rspt->rspt_cpt == cpt);
+
+       /*
+        * invalidate the handle to indicate that a response has been
+        * received, which will then lead the monitor thread to clean up
+        * the rspt block.
+        */
+       LNetInvalidateMDHandle(&rspt->rspt_mdh);
+       lnet_res_unlock(cpt);
+}
+
+static void
+lnet_finalize_expired_responses(bool force)
+{
+       struct lnet_libmd *md;
+       struct list_head local_queue;
+       struct lnet_rsp_tracker *rspt, *tmp;
+       int i;
+
+       if (the_lnet.ln_mt_rstq == NULL)
+               return;
+
+       cfs_cpt_for_each(i, lnet_cpt_table()) {
+               INIT_LIST_HEAD(&local_queue);
+
+               lnet_net_lock(i);
+               if (!the_lnet.ln_mt_rstq[i]) {
+                       lnet_net_unlock(i);
+                       continue;
+               }
+               list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
+               lnet_net_unlock(i);
+
+               list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
+                       /*
+                        * The rspt mdh will be invalidated when a response
+                        * is received or whenever we want to discard the
+                        * block the monitor thread will walk the queue
+                        * and clean up any rsts with an invalid mdh.
+                        * The monitor thread will walk the queue until
+                        * the first unexpired rspt block. This means that
+                        * some rspt blocks which received their
+                        * corresponding responses will linger in the
+                        * queue until they are cleaned up eventually.
+                        */
+                       lnet_res_lock(i);
+                       if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+                               lnet_res_unlock(i);
+                               list_del_init(&rspt->rspt_on_list);
+                               lnet_rspt_free(rspt, i);
+                               continue;
+                       }
+
+                       if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
+                           force) {
+                               md = lnet_handle2md(&rspt->rspt_mdh);
+                               if (!md) {
+                                       LNetInvalidateMDHandle(&rspt->rspt_mdh);
+                                       lnet_res_unlock(i);
+                                       list_del_init(&rspt->rspt_on_list);
+                                       lnet_rspt_free(rspt, i);
+                                       continue;
+                               }
+                               LASSERT(md->md_rspt_ptr == rspt);
+                               md->md_rspt_ptr = NULL;
+                               lnet_res_unlock(i);
+
+                               list_del_init(&rspt->rspt_on_list);
+
+                               CDEBUG(D_NET, "Response timed out: md = %p\n", md);
+                               LNetMDUnlink(rspt->rspt_mdh);
+                               lnet_rspt_free(rspt, i);
+                       } else {
+                               lnet_res_unlock(i);
+                               break;
+                       }
+               }
+
+               lnet_net_lock(i);
+               if (!list_empty(&local_queue))
+                       list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
+               lnet_net_unlock(i);
+       }
+}
+
 static void
 lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
 {
@@ -3146,6 +3250,8 @@ lnet_recover_peer_nis(void)
 static int
 lnet_monitor_thread(void *arg)
 {
+       int wakeup_counter = 0;
+
        /*
         * The monitor thread takes care of the following:
         *  1. Checks the aliveness of routers
@@ -3164,6 +3270,12 @@ lnet_monitor_thread(void *arg)
 
                lnet_resend_pending_msgs();
 
+               wakeup_counter++;
+               if (wakeup_counter >= lnet_transaction_timeout / 2) {
+                       lnet_finalize_expired_responses(false);
+                       wakeup_counter = 0;
+               }
+
                lnet_recover_local_nis();
 
                lnet_recover_peer_nis();
@@ -3348,6 +3460,29 @@ lnet_mt_event_handler(struct lnet_event *event)
        }
 }
 
+static int
+lnet_rsp_tracker_create(void)
+{
+       struct list_head **rstqs;
+       rstqs = lnet_create_array_of_queues();
+
+       if (!rstqs)
+               return -ENOMEM;
+
+       the_lnet.ln_mt_rstq = rstqs;
+
+       return 0;
+}
+
+static void
+lnet_rsp_tracker_clean(void)
+{
+       lnet_finalize_expired_responses(true);
+
+       cfs_percpt_free(the_lnet.ln_mt_rstq);
+       the_lnet.ln_mt_rstq = NULL;
+}
+
 int lnet_monitor_thr_start(void)
 {
        int rc = 0;
@@ -3360,6 +3495,10 @@ int lnet_monitor_thr_start(void)
        if (rc)
                return rc;
 
+       rc = lnet_rsp_tracker_create();
+       if (rc)
+               goto clean_queues;
+
        rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
        if (rc != 0) {
                CERROR("Can't allocate monitor thread EQ: %d\n", rc);
@@ -3394,6 +3533,7 @@ clean_thread:
        lnet_router_cleanup();
 free_mem:
        the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+       lnet_rsp_tracker_clean();
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
@@ -3401,6 +3541,7 @@ free_mem:
        LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
        return rc;
 clean_queues:
+       lnet_rsp_tracker_clean();
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
@@ -3426,6 +3567,7 @@ void lnet_monitor_thr_stop(void)
 
        /* perform cleanup tasks */
        lnet_router_cleanup();
+       lnet_rsp_tracker_clean();
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
@@ -3598,13 +3740,13 @@ lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get)
 static int
 lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
 {
-       void             *private = msg->msg_private;
-       struct lnet_hdr  *hdr = &msg->msg_hdr;
+       void *private = msg->msg_private;
+       struct lnet_hdr *hdr = &msg->msg_hdr;
        struct lnet_process_id src = {0};
-       struct lnet_libmd        *md;
-       int               rlength;
-       int               mlength;
-       int                     cpt;
+       struct lnet_libmd *md;
+       int rlength;
+       int mlength;
+       int cpt;
 
        cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
        lnet_res_lock(cpt);
@@ -3665,10 +3807,10 @@ lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
 static int
 lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg)
 {
-       struct lnet_hdr  *hdr = &msg->msg_hdr;
+       struct lnet_hdr *hdr = &msg->msg_hdr;
        struct lnet_process_id src = {0};
-       struct lnet_libmd        *md;
-       int                     cpt;
+       struct lnet_libmd *md;
+       int cpt;
 
        src.nid = hdr->src_nid;
        src.pid = hdr->src_pid;
@@ -4178,6 +4320,43 @@ lnet_recv_delayed_msg_list(struct list_head *head)
        }
 }
 
+static void
+lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
+                       struct lnet_libmd *md, struct lnet_handle_md mdh)
+{
+       s64 timeout_ns;
+
+       /*
+        * MD has a refcount taken by message so it's not going away.
+        * The MD however can be looked up. We need to secure the access
+        * to the md_rspt_ptr by taking the res_lock.
+        * The rspt can be accessed without protection up to when it gets
+        * added to the list.
+        */
+
+       /* debug code */
+       LASSERT(md->md_rspt_ptr == NULL);
+
+       /* we'll use that same event in case we never get a response  */
+       rspt->rspt_mdh = mdh;
+       rspt->rspt_cpt = cpt;
+       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+       rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
+
+       lnet_res_lock(cpt);
+       /* store the rspt so we can access it when we get the REPLY */
+       md->md_rspt_ptr = rspt;
+       lnet_res_unlock(cpt);
+
+       /*
+        * add to the list of tracked responses. It's added to tail of the
+        * list in order to expire all the older entries first.
+        */
+       lnet_net_lock(cpt);
+       list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
+       lnet_net_unlock(cpt);
+}
+
 /**
  * Initiate an asynchronous PUT operation.
  *
@@ -4228,10 +4407,11 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
        __u64 match_bits, unsigned int offset,
        __u64 hdr_data)
 {
-       struct lnet_msg         *msg;
-       struct lnet_libmd       *md;
-       int                     cpt;
-       int                     rc;
+       struct lnet_msg *msg;
+       struct lnet_libmd *md;
+       int cpt;
+       int rc;
+       struct lnet_rsp_tracker *rspt = NULL;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
@@ -4251,6 +4431,17 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
        msg->msg_vmflush = !!memory_pressure_get();
 
        cpt = lnet_cpt_of_cookie(mdh.cookie);
+
+       if (ack == LNET_ACK_REQ) {
+               rspt = lnet_rspt_alloc(cpt);
+               if (!rspt) {
+                       CERROR("Dropping PUT to %s: ENOMEM on response tracker\n",
+                               libcfs_id2str(target));
+                       return -ENOMEM;
+               }
+               INIT_LIST_HEAD(&rspt->rspt_on_list);
+       }
+
        lnet_res_lock(cpt);
 
        md = lnet_handle2md(&mdh);
@@ -4263,6 +4454,7 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
                               md->md_me->me_portal);
                lnet_res_unlock(cpt);
 
+               LIBCFS_FREE(rspt, sizeof(*rspt));
                lnet_msg_free(msg);
                return -ENOENT;
        }
@@ -4295,11 +4487,15 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
+       if (ack == LNET_ACK_REQ)
+               lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
+
        rc = lnet_send(self, msg, LNET_NID_ANY);
        if (rc != 0) {
                CNETERR("Error sending PUT to %s: %d\n",
                        libcfs_id2str(target), rc);
                msg->msg_no_resend = true;
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
                lnet_finalize(msg, rc);
        }
 
@@ -4431,10 +4627,11 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
        struct lnet_process_id target, unsigned int portal,
        __u64 match_bits, unsigned int offset, bool recovery)
 {
-       struct lnet_msg         *msg;
-       struct lnet_libmd       *md;
-       int                     cpt;
-       int                     rc;
+       struct lnet_msg *msg;
+       struct lnet_libmd *md;
+       struct lnet_rsp_tracker *rspt;
+       int cpt;
+       int rc;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
@@ -4447,15 +4644,24 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
        }
 
        msg = lnet_msg_alloc();
-       if (msg == NULL) {
+       if (!msg) {
                CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n",
                       libcfs_id2str(target));
                return -ENOMEM;
        }
 
+       cpt = lnet_cpt_of_cookie(mdh.cookie);
+
+       rspt = lnet_rspt_alloc(cpt);
+       if (!rspt) {
+               CERROR("Dropping GET to %s: ENOMEM on response tracker\n",
+                      libcfs_id2str(target));
+               return -ENOMEM;
+       }
+       INIT_LIST_HEAD(&rspt->rspt_on_list);
+
        msg->msg_recovery = recovery;
 
-       cpt = lnet_cpt_of_cookie(mdh.cookie);
        lnet_res_lock(cpt);
 
        md = lnet_handle2md(&mdh);
@@ -4470,6 +4676,7 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
                lnet_res_unlock(cpt);
 
                lnet_msg_free(msg);
+               LIBCFS_FREE(rspt, sizeof(*rspt));
                return -ENOENT;
        }
 
@@ -4494,11 +4701,14 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
+       lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
+
        rc = lnet_send(self, msg, LNET_NID_ANY);
        if (rc < 0) {
                CNETERR("Error sending GET to %s: %d\n",
                        libcfs_id2str(target), rc);
                msg->msg_no_resend = true;
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
                lnet_finalize(msg, rc);
        }
 
index 8de5211..fb87c50 100644 (file)
@@ -780,6 +780,16 @@ lnet_finalize(struct lnet_msg *msg, int status)
 
        msg->msg_ev.status = status;
 
+       /*
+        * if this is an ACK or a REPLY then make sure to remove the
+        * response tracker.
+        */
+       if (msg->msg_ev.type == LNET_EVENT_REPLY ||
+           msg->msg_ev.type == LNET_EVENT_ACK) {
+               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
+       }
+
        /* if the message is successfully sent, no need to keep the MD around */
        if (msg->msg_md != NULL && !status)
                lnet_detach_md(msg, status);