Whamcloud - gitweb
LU-10428 lnet: call event handlers without res_lock 68/37068/23
authorMr NeilBrown <neilb@suse.de>
Fri, 20 Dec 2019 00:51:43 +0000 (11:51 +1100)
committerOleg Drokin <green@whamcloud.com>
Tue, 1 Sep 2020 03:42:58 +0000 (03:42 +0000)
Currently event handlers are called with the lnet_res_lock()
(a spinlock) held.  This is a problem if the handler wants
to take a mutex, allocate memory, or sleep for some other
reason.

The lock is needed because handlers for a given md need to
be serialized.  At the very least, the final event which
reports that the md is "unlinked" needs to be called last,
after any other events have been handled.

Instead of using a spinlock to ensure this ordering, we can
use a flag bit in the md.

- Before considering whether to send an event we wait for the flag bit
  to be cleared.  This ensures serialization.
- Also wait for the flag to be cleared before final freeing of the md.
- If this is not an unlink event and we need to call the handler, we
  set the flag bit before dropping lnet_res_lock().  This
  ensures the not further events will happen, and that the md
  won't be freed - so we can still clear the flag.
- use wait_var_event to wait for the flag it to be cleared,
  and wake_up_var() to signal a wakeup.  After wait_var_event()
  returns, we need to take the spinlock and check again.

Signed-off-by: Mr NeilBrown <neilb@suse.de>
Change-Id: I4dada92c4c06547bdc567838d129a8851d7de3bd
Reviewed-on: https://review.whamcloud.com/37068
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Chris Horn <chris.horn@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/lib-md.c
lnet/lnet/lib-msg.c

index 6bfe88d..a7de9a4 100644 (file)
@@ -244,6 +244,33 @@ lnet_ni_set_status(struct lnet_ni *ni, __u32 status)
        return update;
 }
 
        return update;
 }
 
+static inline void lnet_md_wait_handling(struct lnet_libmd *md, int cpt)
+{
+       wait_queue_head_t *wq = __var_waitqueue(md);
+#ifdef HAVE_WAIT_QUEUE_ENTRY
+       struct wait_bit_queue_entry entry;
+       wait_queue_entry_t *wqe = &entry.wq_entry;
+#else
+       struct wait_bit_queue entry;
+       wait_queue_entry_t *wqe = &entry.wait;
+#endif
+       init_wait_var_entry(&entry, md, 0);
+       prepare_to_wait_event(wq, wqe, TASK_IDLE);
+       if (md->md_flags & LNET_MD_FLAG_HANDLING) {
+               /* Race with unlocked call to ->md_handler.
+                * It is safe to drop the res_lock here as the
+                * caller has only just claimed it.
+                */
+               lnet_res_unlock(cpt);
+               schedule();
+               /* Cannot check md now, it might be freed.  Caller
+                * must reclaim reference and check.
+                */
+               lnet_res_lock(cpt);
+       }
+       finish_wait(wq, wqe);
+}
+
 static inline void
 lnet_md_free(struct lnet_libmd *md)
 {
 static inline void
 lnet_md_free(struct lnet_libmd *md)
 {
index c4b6222..1a71059 100644 (file)
@@ -217,6 +217,15 @@ struct lnet_libmd {
 #define LNET_MD_FLAG_ZOMBIE     BIT(0)
 #define LNET_MD_FLAG_AUTO_UNLINK BIT(1)
 #define LNET_MD_FLAG_ABORTED    BIT(2)
 #define LNET_MD_FLAG_ZOMBIE     BIT(0)
 #define LNET_MD_FLAG_AUTO_UNLINK BIT(1)
 #define LNET_MD_FLAG_ABORTED    BIT(2)
+/* LNET_MD_FLAG_HANDLING is set when a non-unlink event handler
+ * is being called for an event relating to the md.
+ * It ensures only one such handler runs at a time.
+ * The final "unlink" event is only called once the
+ * md_refcount has reached zero, and this flag has been cleared,
+ * ensuring that it doesn't race with any other event handler
+ * call.
+ */
+#define LNET_MD_FLAG_HANDLING   BIT(3)
 
 struct lnet_test_peer {
        /* info about peers we are trying to fail */
 
 struct lnet_test_peer {
        /* info about peers we are trying to fail */
index 61db468..bc10b4a 100644 (file)
@@ -70,6 +70,7 @@ lnet_md_unlink(struct lnet_libmd *md)
 
        LASSERT(!list_empty(&md->md_list));
        list_del_init(&md->md_list);
 
        LASSERT(!list_empty(&md->md_list));
        list_del_init(&md->md_list);
+       LASSERT(!(md->md_flags & LNET_MD_FLAG_HANDLING));
        lnet_md_free(md);
 }
 
        lnet_md_free(md);
 }
 
@@ -473,18 +474,26 @@ int
 LNetMDUnlink(struct lnet_handle_md mdh)
 {
        struct lnet_event ev;
 LNetMDUnlink(struct lnet_handle_md mdh)
 {
        struct lnet_event ev;
-       struct lnet_libmd *md;
+       struct lnet_libmd *md = NULL;
+       lnet_handler_t handler = NULL;
        int cpt;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
        cpt = lnet_cpt_of_cookie(mdh.cookie);
        lnet_res_lock(cpt);
        int cpt;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
        cpt = lnet_cpt_of_cookie(mdh.cookie);
        lnet_res_lock(cpt);
-
-       md = lnet_handle2md(&mdh);
-       if (md == NULL) {
-               lnet_res_unlock(cpt);
-               return -ENOENT;
+       while (!md) {
+               md = lnet_handle2md(&mdh);
+               if (!md) {
+                       lnet_res_unlock(cpt);
+                       return -ENOENT;
+               }
+               if (md->md_refcount == 0 &&
+                   md->md_flags & LNET_MD_FLAG_HANDLING) {
+                       /* Race with unlocked call to ->md_handler. */
+                       lnet_md_wait_handling(md, cpt);
+                       md = NULL;
+               }
        }
 
        md->md_flags |= LNET_MD_FLAG_ABORTED;
        }
 
        md->md_flags |= LNET_MD_FLAG_ABORTED;
@@ -493,7 +502,7 @@ LNetMDUnlink(struct lnet_handle_md mdh)
         * unlinked. Otherwise, we enqueue an event now... */
        if (md->md_handler && md->md_refcount == 0) {
                lnet_build_unlink_event(md, &ev);
         * unlinked. Otherwise, we enqueue an event now... */
        if (md->md_handler && md->md_refcount == 0) {
                lnet_build_unlink_event(md, &ev);
-               md->md_handler(&ev);
+               handler = md->md_handler;
        }
 
        if (md->md_rspt_ptr != NULL)
        }
 
        if (md->md_rspt_ptr != NULL)
@@ -502,6 +511,10 @@ LNetMDUnlink(struct lnet_handle_md mdh)
        lnet_md_unlink(md);
 
        lnet_res_unlock(cpt);
        lnet_md_unlink(md);
 
        lnet_res_unlock(cpt);
+
+       if (handler)
+               handler(&ev);
+
        return 0;
 }
 EXPORT_SYMBOL(LNetMDUnlink);
        return 0;
 }
 EXPORT_SYMBOL(LNetMDUnlink);
index 8ab8194..a6302d5 100644 (file)
@@ -942,11 +942,20 @@ lnet_health_check(struct lnet_msg *msg)
 }
 
 static void
 }
 
 static void
-lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
+lnet_msg_detach_md(struct lnet_msg *msg, int status)
 {
        struct lnet_libmd *md = msg->msg_md;
 {
        struct lnet_libmd *md = msg->msg_md;
+       lnet_handler_t handler = NULL;
+       int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
        int unlink;
 
        int unlink;
 
+       lnet_res_lock(cpt);
+       while (md->md_flags & LNET_MD_FLAG_HANDLING)
+               /* An event handler is running - wait for it to
+                * complete to avoid races.
+                */
+               lnet_md_wait_handling(md, cpt);
+
        /* Now it's safe to drop my caller's ref */
        md->md_refcount--;
        LASSERT(md->md_refcount >= 0);
        /* Now it's safe to drop my caller's ref */
        md->md_refcount--;
        LASSERT(md->md_refcount >= 0);
@@ -960,17 +969,30 @@ lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
                        msg->msg_ev.status   = status;
                }
                msg->msg_ev.unlinked = unlink;
                        msg->msg_ev.status   = status;
                }
                msg->msg_ev.unlinked = unlink;
-               md->md_handler(&msg->msg_ev);
+               handler = md->md_handler;
+               if (!unlink)
+                       md->md_flags |= LNET_MD_FLAG_HANDLING;
        }
 
        if (unlink || (md->md_refcount == 0 &&
                       md->md_threshold == LNET_MD_THRESH_INF))
                lnet_detach_rsp_tracker(md, cpt);
 
        }
 
        if (unlink || (md->md_refcount == 0 &&
                       md->md_threshold == LNET_MD_THRESH_INF))
                lnet_detach_rsp_tracker(md, cpt);
 
+       msg->msg_md = NULL;
        if (unlink)
                lnet_md_unlink(md);
 
        if (unlink)
                lnet_md_unlink(md);
 
-       msg->msg_md = NULL;
+       lnet_res_unlock(cpt);
+
+       if (handler) {
+               handler(&msg->msg_ev);
+               if (!unlink) {
+                       lnet_res_lock(cpt);
+                       md->md_flags &= ~LNET_MD_FLAG_HANDLING;
+                       wake_up_var(md);
+                       lnet_res_unlock(cpt);
+               }
+       }
 }
 
 static bool
 }
 
 static bool
@@ -1106,12 +1128,8 @@ lnet_finalize(struct lnet_msg *msg, int status)
         * We're not going to resend this message so detach its MD and invoke
         * the appropriate callbacks
         */
         * We're not going to resend this message so detach its MD and invoke
         * the appropriate callbacks
         */
-       if (msg->msg_md != NULL) {
-               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
-               lnet_res_lock(cpt);
-               lnet_msg_detach_md(msg, cpt, status);
-               lnet_res_unlock(cpt);
-       }
+       if (msg->msg_md != NULL)
+               lnet_msg_detach_md(msg, status);
 
 again:
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
 
 again:
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {