}
static void
-lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
+lnet_msg_detach_md(struct lnet_msg *msg, int status)
{
struct lnet_libmd *md = msg->msg_md;
+ lnet_handler_t handler = NULL;
+ int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
int unlink;
+ lnet_res_lock(cpt);
+ while (md->md_flags & LNET_MD_FLAG_HANDLING)
+ /* An event handler is running - wait for it to
+ * complete to avoid races.
+ */
+ lnet_md_wait_handling(md, cpt);
+
/* Now it's safe to drop my caller's ref */
md->md_refcount--;
LASSERT(md->md_refcount >= 0);
msg->msg_ev.status = status;
}
msg->msg_ev.unlinked = unlink;
- md->md_handler(&msg->msg_ev);
+ handler = md->md_handler;
+ if (!unlink)
+ md->md_flags |= LNET_MD_FLAG_HANDLING;
}
if (unlink || (md->md_refcount == 0 &&
md->md_threshold == LNET_MD_THRESH_INF))
lnet_detach_rsp_tracker(md, cpt);
+ msg->msg_md = NULL;
if (unlink)
lnet_md_unlink(md);
- msg->msg_md = NULL;
+ lnet_res_unlock(cpt);
+
+ if (handler) {
+ handler(&msg->msg_ev);
+ if (!unlink) {
+ lnet_res_lock(cpt);
+ md->md_flags &= ~LNET_MD_FLAG_HANDLING;
+ wake_up_var(md);
+ lnet_res_unlock(cpt);
+ }
+ }
}
static bool
* We're not going to resend this message so detach its MD and invoke
* the appropriate callbacks
*/
- if (msg->msg_md != NULL) {
- cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
- lnet_res_lock(cpt);
- lnet_msg_detach_md(msg, cpt, status);
- lnet_res_unlock(cpt);
- }
+ if (msg->msg_md != NULL)
+ lnet_msg_detach_md(msg, status);
again:
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {