From: Mr NeilBrown Date: Fri, 20 Dec 2019 00:51:43 +0000 (+1100) Subject: LU-10428 lnet: call event handlers without res_lock X-Git-Tag: 2.13.56~78 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=d05427a7856e8f89cf6ec47f2731e12c6fa22901 LU-10428 lnet: call event handlers without res_lock Currently event handlers are called with the lnet_res_lock() (a spinlock) held. This is a problem if the handler wants to take a mutex, allocate memory, or sleep for some other reason. The lock is needed because handlers for a given md need to be serialized. At the very least, the final event which reports that the md is "unlinked" needs to be called last, after any other events have been handled. Instead of using a spinlock to ensure this ordering, we can use a flag bit in the md. - Before considering whether to send an event we wait for the flag bit to be cleared. This ensures serialization. - Also wait for the flag to be cleared before final freeing of the md. - If this is not an unlink event and we need to call the handler, we set the flag bit before dropping lnet_res_lock(). This ensures the not further events will happen, and that the md won't be freed - so we can still clear the flag. - use wait_var_event to wait for the flag it to be cleared, and wake_up_var() to signal a wakeup. After wait_var_event() returns, we need to take the spinlock and check again. Signed-off-by: Mr NeilBrown Change-Id: I4dada92c4c06547bdc567838d129a8851d7de3bd Reviewed-on: https://review.whamcloud.com/37068 Tested-by: jenkins Reviewed-by: James Simmons Tested-by: Maloo Reviewed-by: Chris Horn Reviewed-by: Oleg Drokin --- diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 6bfe88d..a7de9a4 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -244,6 +244,33 @@ lnet_ni_set_status(struct lnet_ni *ni, __u32 status) return update; } +static inline void lnet_md_wait_handling(struct lnet_libmd *md, int cpt) +{ + wait_queue_head_t *wq = __var_waitqueue(md); +#ifdef HAVE_WAIT_QUEUE_ENTRY + struct wait_bit_queue_entry entry; + wait_queue_entry_t *wqe = &entry.wq_entry; +#else + struct wait_bit_queue entry; + wait_queue_entry_t *wqe = &entry.wait; +#endif + init_wait_var_entry(&entry, md, 0); + prepare_to_wait_event(wq, wqe, TASK_IDLE); + if (md->md_flags & LNET_MD_FLAG_HANDLING) { + /* Race with unlocked call to ->md_handler. + * It is safe to drop the res_lock here as the + * caller has only just claimed it. + */ + lnet_res_unlock(cpt); + schedule(); + /* Cannot check md now, it might be freed. Caller + * must reclaim reference and check. + */ + lnet_res_lock(cpt); + } + finish_wait(wq, wqe); +} + static inline void lnet_md_free(struct lnet_libmd *md) { diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index c4b6222..1a71059 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -217,6 +217,15 @@ struct lnet_libmd { #define LNET_MD_FLAG_ZOMBIE BIT(0) #define LNET_MD_FLAG_AUTO_UNLINK BIT(1) #define LNET_MD_FLAG_ABORTED BIT(2) +/* LNET_MD_FLAG_HANDLING is set when a non-unlink event handler + * is being called for an event relating to the md. + * It ensures only one such handler runs at a time. + * The final "unlink" event is only called once the + * md_refcount has reached zero, and this flag has been cleared, + * ensuring that it doesn't race with any other event handler + * call. + */ +#define LNET_MD_FLAG_HANDLING BIT(3) struct lnet_test_peer { /* info about peers we are trying to fail */ diff --git a/lnet/lnet/lib-md.c b/lnet/lnet/lib-md.c index 61db468..bc10b4a 100644 --- a/lnet/lnet/lib-md.c +++ b/lnet/lnet/lib-md.c @@ -70,6 +70,7 @@ lnet_md_unlink(struct lnet_libmd *md) LASSERT(!list_empty(&md->md_list)); list_del_init(&md->md_list); + LASSERT(!(md->md_flags & LNET_MD_FLAG_HANDLING)); lnet_md_free(md); } @@ -473,18 +474,26 @@ int LNetMDUnlink(struct lnet_handle_md mdh) { struct lnet_event ev; - struct lnet_libmd *md; + struct lnet_libmd *md = NULL; + lnet_handler_t handler = NULL; int cpt; LASSERT(the_lnet.ln_refcount > 0); cpt = lnet_cpt_of_cookie(mdh.cookie); lnet_res_lock(cpt); - - md = lnet_handle2md(&mdh); - if (md == NULL) { - lnet_res_unlock(cpt); - return -ENOENT; + while (!md) { + md = lnet_handle2md(&mdh); + if (!md) { + lnet_res_unlock(cpt); + return -ENOENT; + } + if (md->md_refcount == 0 && + md->md_flags & LNET_MD_FLAG_HANDLING) { + /* Race with unlocked call to ->md_handler. */ + lnet_md_wait_handling(md, cpt); + md = NULL; + } } md->md_flags |= LNET_MD_FLAG_ABORTED; @@ -493,7 +502,7 @@ LNetMDUnlink(struct lnet_handle_md mdh) * unlinked. Otherwise, we enqueue an event now... */ if (md->md_handler && md->md_refcount == 0) { lnet_build_unlink_event(md, &ev); - md->md_handler(&ev); + handler = md->md_handler; } if (md->md_rspt_ptr != NULL) @@ -502,6 +511,10 @@ LNetMDUnlink(struct lnet_handle_md mdh) lnet_md_unlink(md); lnet_res_unlock(cpt); + + if (handler) + handler(&ev); + return 0; } EXPORT_SYMBOL(LNetMDUnlink); diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 8ab8194..a6302d5 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -942,11 +942,20 @@ lnet_health_check(struct lnet_msg *msg) } static void -lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status) +lnet_msg_detach_md(struct lnet_msg *msg, int status) { struct lnet_libmd *md = msg->msg_md; + lnet_handler_t handler = NULL; + int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie); int unlink; + lnet_res_lock(cpt); + while (md->md_flags & LNET_MD_FLAG_HANDLING) + /* An event handler is running - wait for it to + * complete to avoid races. + */ + lnet_md_wait_handling(md, cpt); + /* Now it's safe to drop my caller's ref */ md->md_refcount--; LASSERT(md->md_refcount >= 0); @@ -960,17 +969,30 @@ lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status) msg->msg_ev.status = status; } msg->msg_ev.unlinked = unlink; - md->md_handler(&msg->msg_ev); + handler = md->md_handler; + if (!unlink) + md->md_flags |= LNET_MD_FLAG_HANDLING; } if (unlink || (md->md_refcount == 0 && md->md_threshold == LNET_MD_THRESH_INF)) lnet_detach_rsp_tracker(md, cpt); + msg->msg_md = NULL; if (unlink) lnet_md_unlink(md); - msg->msg_md = NULL; + lnet_res_unlock(cpt); + + if (handler) { + handler(&msg->msg_ev); + if (!unlink) { + lnet_res_lock(cpt); + md->md_flags &= ~LNET_MD_FLAG_HANDLING; + wake_up_var(md); + lnet_res_unlock(cpt); + } + } } static bool @@ -1106,12 +1128,8 @@ lnet_finalize(struct lnet_msg *msg, int status) * We're not going to resend this message so detach its MD and invoke * the appropriate callbacks */ - if (msg->msg_md != NULL) { - cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); - lnet_res_lock(cpt); - lnet_msg_detach_md(msg, cpt, status); - lnet_res_unlock(cpt); - } + if (msg->msg_md != NULL) + lnet_msg_detach_md(msg, status); again: if (!msg->msg_tx_committed && !msg->msg_rx_committed) {