Whamcloud - gitweb
LU-12568 lnet: Defer rspt cleanup when MD queued for unlink 76/35576/9
authorChris Horn <hornc@cray.com>
Sat, 20 Jul 2019 14:38:25 +0000 (09:38 -0500)
committerOleg Drokin <green@whamcloud.com>
Thu, 15 Aug 2019 07:51:07 +0000 (07:51 +0000)
When an MD is queued for unlink its lnet_libhandle is invalidated so
that future lookups of the MD fail. As a result, the monitor thread
cannot detach the response tracker from such an MD, and instead must
wait for the remaining operations on the MD to complete before it can
safely free the response tracker and remove it from the list. Freeing
the memory while there are pending operations on the MD can result
in a use after free situation when the final operation on the MD
completes and we attempt to remove the response tracker from the MD
via the lnet_msg_detach_md()->lnet_detach_rsp_tracker() call chain.

Here we introduce zombie lists for such response trackers. This will
allow us to also handle the case where there are response trackers
on the monitor queue during LNet shutdown. In this instance the
zombie response trackers will be freed when either all the operations
on the MD have completed (this free'ing is performed by
lnet_detach_rsp_tracker()) or after the LND Nets have shutdown since
we are ensured there will not be any more operations on the
associated MDs (this free'ing is performed by
lnet_clean_zombie_rstqs()).

Three other small changes are included in this patch:
 - When deleting the response tracker from the monitor's list we
   should use list_del() rather than list_del_init() since we'll
   be freeing the response tracker after removing it from the list.
 - Perform a single ktime_get() call for each local queue.
 - Move the check of whether the local queue is empty outside of
   the net lock.

Signed-off-by: Chris Horn <hornc@cray.com>
Change-Id: I2a62ceb5b259a094204a1500527443e942483386
Reviewed-on: https://review.whamcloud.com/35576
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Amir Shehata <ashehata@whamcloud.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c

index 3e7749e..ce84b1a 100644 (file)
@@ -636,6 +636,8 @@ void lnet_return_rx_credits_locked(struct lnet_msg *msg);
 void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp);
 void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt);
 
 void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp);
 void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt);
 
+struct list_head **lnet_create_array_of_queues(void);
+
 /* portals functions */
 /* portals attributes */
 static inline int
 /* portals functions */
 /* portals attributes */
 static inline int
@@ -706,6 +708,7 @@ struct lnet_msg *lnet_create_reply_msg(struct lnet_ni *ni,
 void lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *msg,
                            unsigned int len);
 void lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt);
 void lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *msg,
                            unsigned int len);
 void lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt);
+void lnet_clean_zombie_rstqs(void);
 
 void lnet_finalize(struct lnet_msg *msg, int rc);
 bool lnet_send_error_simulation(struct lnet_msg *msg,
 
 void lnet_finalize(struct lnet_msg *msg, int rc);
 bool lnet_send_error_simulation(struct lnet_msg *msg,
index c3feaea..af591d8 100644 (file)
@@ -1140,6 +1140,13 @@ struct lnet {
         * based on the mdh cookie.
         */
        struct list_head                **ln_mt_rstq;
         * based on the mdh cookie.
         */
        struct list_head                **ln_mt_rstq;
+       /*
+        * A response tracker becomes a zombie when the associated MD is queued
+        * for unlink before the response tracker is detached from the MD. An
+        * entry on a zombie list can be freed when either the remaining
+        * operations on the MD complete or when LNet has shut down.
+        */
+       struct list_head                **ln_mt_zombie_rstqs;
        /* recovery eq handler */
        struct lnet_handle_eq           ln_mt_eqh;
 
        /* recovery eq handler */
        struct lnet_handle_eq           ln_mt_eqh;
 
index d94f9f3..99fda25 100644 (file)
@@ -1100,6 +1100,26 @@ lnet_res_lh_initialize(struct lnet_res_container *rec,
        list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
 }
 
        list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
 }
 
+struct list_head **
+lnet_create_array_of_queues(void)
+{
+       struct list_head **qs;
+       struct list_head *q;
+       int i;
+
+       qs = cfs_percpt_alloc(lnet_cpt_table(),
+                             sizeof(struct list_head));
+       if (!qs) {
+               CERROR("Failed to allocate queues\n");
+               return NULL;
+       }
+
+       cfs_percpt_for_each(q, i, qs)
+               INIT_LIST_HEAD(q);
+
+       return qs;
+}
+
 static int lnet_unprepare(void);
 
 static int
 static int lnet_unprepare(void);
 
 static int
@@ -1193,6 +1213,12 @@ lnet_prepare(lnet_pid_t requested_pid)
                goto failed;
        }
 
                goto failed;
        }
 
+       the_lnet.ln_mt_zombie_rstqs = lnet_create_array_of_queues();
+       if (!the_lnet.ln_mt_zombie_rstqs) {
+               rc = -ENOMEM;
+               goto failed;
+       }
+
        return 0;
 
  failed:
        return 0;
 
  failed:
@@ -1216,6 +1242,11 @@ lnet_unprepare (void)
        LASSERT(list_empty(&the_lnet.ln_test_peers));
        LASSERT(list_empty(&the_lnet.ln_nets));
 
        LASSERT(list_empty(&the_lnet.ln_test_peers));
        LASSERT(list_empty(&the_lnet.ln_nets));
 
+       if (the_lnet.ln_mt_zombie_rstqs) {
+               lnet_clean_zombie_rstqs();
+               the_lnet.ln_mt_zombie_rstqs = NULL;
+       }
+
        if (!LNetEQHandleIsInvalid(the_lnet.ln_mt_eqh)) {
                rc = LNetEQFree(the_lnet.ln_mt_eqh);
                LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
        if (!LNetEQHandleIsInvalid(the_lnet.ln_mt_eqh)) {
                rc = LNetEQFree(the_lnet.ln_mt_eqh);
                LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
index c1d4e84..49941de 100644 (file)
@@ -2804,25 +2804,57 @@ lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
                return;
 
        rspt = md->md_rspt_ptr;
                return;
 
        rspt = md->md_rspt_ptr;
-       md->md_rspt_ptr = NULL;
 
        /* debug code */
        LASSERT(rspt->rspt_cpt == cpt);
 
 
        /* debug code */
        LASSERT(rspt->rspt_cpt == cpt);
 
-       /*
-        * invalidate the handle to indicate that a response has been
-        * received, which will then lead the monitor thread to clean up
-        * the rspt block.
-        */
-       LNetInvalidateMDHandle(&rspt->rspt_mdh);
+       md->md_rspt_ptr = NULL;
+
+       if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+               /*
+                * The monitor thread has invalidated this handle because the
+                * response timed out, but it failed to lookup the MD. That
+                * means this response tracker is on the zombie list. We can
+                * safely remove it under the resource lock (held by caller) and
+                * free the response tracker block.
+                */
+               list_del(&rspt->rspt_on_list);
+               lnet_rspt_free(rspt, cpt);
+       } else {
+               /*
+                * invalidate the handle to indicate that a response has been
+                * received, which will then lead the monitor thread to clean up
+                * the rspt block.
+                */
+               LNetInvalidateMDHandle(&rspt->rspt_mdh);
+       }
+}
+
+void
+lnet_clean_zombie_rstqs(void)
+{
+       struct lnet_rsp_tracker *rspt, *tmp;
+       int i;
+
+       cfs_cpt_for_each(i, lnet_cpt_table()) {
+               list_for_each_entry_safe(rspt, tmp,
+                                        the_lnet.ln_mt_zombie_rstqs[i],
+                                        rspt_on_list) {
+                       list_del(&rspt->rspt_on_list);
+                       lnet_rspt_free(rspt, i);
+               }
+       }
+
+       cfs_percpt_free(the_lnet.ln_mt_zombie_rstqs);
 }
 
 static void
 }
 
 static void
-lnet_finalize_expired_responses(bool force)
+lnet_finalize_expired_responses(void)
 {
        struct lnet_libmd *md;
        struct list_head local_queue;
        struct lnet_rsp_tracker *rspt, *tmp;
 {
        struct lnet_libmd *md;
        struct list_head local_queue;
        struct lnet_rsp_tracker *rspt, *tmp;
+       ktime_t now;
        int i;
 
        if (the_lnet.ln_mt_rstq == NULL)
        int i;
 
        if (the_lnet.ln_mt_rstq == NULL)
@@ -2839,6 +2871,8 @@ lnet_finalize_expired_responses(bool force)
                list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
                lnet_net_unlock(i);
 
                list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
                lnet_net_unlock(i);
 
+               now = ktime_get();
+
                list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
                        /*
                         * The rspt mdh will be invalidated when a response
                list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
                        /*
                         * The rspt mdh will be invalidated when a response
@@ -2854,41 +2888,73 @@ lnet_finalize_expired_responses(bool force)
                        lnet_res_lock(i);
                        if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
                                lnet_res_unlock(i);
                        lnet_res_lock(i);
                        if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
                                lnet_res_unlock(i);
-                               list_del_init(&rspt->rspt_on_list);
+                               list_del(&rspt->rspt_on_list);
                                lnet_rspt_free(rspt, i);
                                continue;
                        }
 
                                lnet_rspt_free(rspt, i);
                                continue;
                        }
 
-                       if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
-                           force) {
+                       if (ktime_compare(now, rspt->rspt_deadline) >= 0 ||
+                           the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) {
                                struct lnet_peer_ni *lpni;
                                lnet_nid_t nid;
 
                                md = lnet_handle2md(&rspt->rspt_mdh);
                                if (!md) {
                                struct lnet_peer_ni *lpni;
                                lnet_nid_t nid;
 
                                md = lnet_handle2md(&rspt->rspt_mdh);
                                if (!md) {
+                                       /* MD has been queued for unlink, but
+                                        * rspt hasn't been detached (Note we've
+                                        * checked above that the rspt_mdh is
+                                        * valid). Since we cannot lookup the MD
+                                        * we're unable to detach the rspt
+                                        * ourselves. Thus, move the rspt to the
+                                        * zombie list where we'll wait for
+                                        * either:
+                                        *   1. The remaining operations on the
+                                        *   MD to complete. In this case the
+                                        *   final operation will result in
+                                        *   lnet_msg_detach_md()->
+                                        *   lnet_detach_rsp_tracker() where
+                                        *   we will clean up this response
+                                        *   tracker.
+                                        *   2. LNet to shutdown. In this case
+                                        *   we'll wait until after all LND Nets
+                                        *   have shutdown and then we can
+                                        *   safely free any remaining response
+                                        *   tracker blocks on the zombie list.
+                                        * Note: We need to hold the resource
+                                        * lock when adding to the zombie list
+                                        * because we may have concurrent access
+                                        * with lnet_detach_rsp_tracker().
+                                        */
                                        LNetInvalidateMDHandle(&rspt->rspt_mdh);
                                        LNetInvalidateMDHandle(&rspt->rspt_mdh);
+                                       list_move(&rspt->rspt_on_list,
+                                                 the_lnet.ln_mt_zombie_rstqs[i]);
                                        lnet_res_unlock(i);
                                        lnet_res_unlock(i);
-                                       list_del_init(&rspt->rspt_on_list);
-                                       lnet_rspt_free(rspt, i);
                                        continue;
                                }
                                LASSERT(md->md_rspt_ptr == rspt);
                                md->md_rspt_ptr = NULL;
                                lnet_res_unlock(i);
 
                                        continue;
                                }
                                LASSERT(md->md_rspt_ptr == rspt);
                                md->md_rspt_ptr = NULL;
                                lnet_res_unlock(i);
 
+                               LNetMDUnlink(rspt->rspt_mdh);
+
+                               nid = rspt->rspt_next_hop_nid;
+
+                               list_del(&rspt->rspt_on_list);
+                               lnet_rspt_free(rspt, i);
+
+                               /* If we're shutting down we just want to clean
+                                * up the rspt blocks
+                                */
+                               if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+                                       continue;
+
                                lnet_net_lock(i);
                                the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
                                lnet_net_unlock(i);
 
                                lnet_net_lock(i);
                                the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
                                lnet_net_unlock(i);
 
-                               list_del_init(&rspt->rspt_on_list);
-
-                               nid = rspt->rspt_next_hop_nid;
-
                                CDEBUG(D_NET,
                                       "Response timeout: md = %p: nid = %s\n",
                                       md, libcfs_nid2str(nid));
                                CDEBUG(D_NET,
                                       "Response timeout: md = %p: nid = %s\n",
                                       md, libcfs_nid2str(nid));
-                               LNetMDUnlink(rspt->rspt_mdh);
-                               lnet_rspt_free(rspt, i);
 
                                /*
                                 * If there is a timeout on the response
 
                                /*
                                 * If there is a timeout on the response
@@ -2908,10 +2974,11 @@ lnet_finalize_expired_responses(bool force)
                        }
                }
 
                        }
                }
 
-               lnet_net_lock(i);
-               if (!list_empty(&local_queue))
+               if (!list_empty(&local_queue)) {
+                       lnet_net_lock(i);
                        list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
                        list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
-               lnet_net_unlock(i);
+                       lnet_net_unlock(i);
+               }
        }
 }
 
        }
 }
 
@@ -3184,26 +3251,6 @@ lnet_recover_local_nis(void)
        lnet_net_unlock(0);
 }
 
        lnet_net_unlock(0);
 }
 
-static struct list_head **
-lnet_create_array_of_queues(void)
-{
-       struct list_head **qs;
-       struct list_head *q;
-       int i;
-
-       qs = cfs_percpt_alloc(lnet_cpt_table(),
-                             sizeof(struct list_head));
-       if (!qs) {
-               CERROR("Failed to allocate queues\n");
-               return NULL;
-       }
-
-       cfs_percpt_for_each(q, i, qs)
-               INIT_LIST_HEAD(q);
-
-       return qs;
-}
-
 static int
 lnet_resendqs_create(void)
 {
 static int
 lnet_resendqs_create(void)
 {
@@ -3468,7 +3515,7 @@ lnet_monitor_thread(void *arg)
                lnet_resend_pending_msgs();
 
                if (now >= rsp_timeout) {
                lnet_resend_pending_msgs();
 
                if (now >= rsp_timeout) {
-                       lnet_finalize_expired_responses(false);
+                       lnet_finalize_expired_responses();
                        rsp_timeout = now + (lnet_transaction_timeout / 2);
                }
 
                        rsp_timeout = now + (lnet_transaction_timeout / 2);
                }
 
@@ -3690,7 +3737,7 @@ lnet_rsp_tracker_create(void)
 static void
 lnet_rsp_tracker_clean(void)
 {
 static void
 lnet_rsp_tracker_clean(void)
 {
-       lnet_finalize_expired_responses(true);
+       lnet_finalize_expired_responses();
 
        cfs_percpt_free(the_lnet.ln_mt_rstq);
        the_lnet.ln_mt_rstq = NULL;
 
        cfs_percpt_free(the_lnet.ln_mt_rstq);
        the_lnet.ln_mt_rstq = NULL;