From: Amir Shehata Date: Wed, 5 Dec 2018 21:57:11 +0000 (-0800) Subject: LU-11734 lnet: handle multi-md usage X-Git-Tag: 2.12.0-RC2~4 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F94%2F33794%2F2;p=fs%2Flustre-release.git LU-11734 lnet: handle multi-md usage The MD can be used multiple times. The response tracker needs to have the same lifespan as the MD. If we re-use the MD and a response tracker has already been attached to it, then we'll update the deadline for the response tracker. This means the deadline on the MD is for its last user. Signed-off-by: Amir Shehata Change-Id: I681630c3d599f66c007926525708e3004b343455 Reviewed-on: https://review.whamcloud.com/33794 Tested-by: Jenkins Reviewed-by: James Simmons Reviewed-by: Olaf Weber Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 0299a21..c99d49b 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -604,7 +604,6 @@ int lnet_islocalnet(__u32 net); void lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md, unsigned int offset, unsigned int mlen); -void lnet_msg_detach_md(struct lnet_msg *msg, int status); void lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev); void lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type); void lnet_msg_commit(struct lnet_msg *msg, int cpt); diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index e217e95..6d2ac7a 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -2673,6 +2673,7 @@ struct lnet_mt_event_info { lnet_nid_t mt_nid; }; +/* called with res_lock held */ void lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt) { @@ -2683,11 +2684,9 @@ lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt) * The rspt queue for the cpt is protected by * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie. */ - lnet_res_lock(cpt); - if (!md->md_rspt_ptr) { - lnet_res_unlock(cpt); + if (!md->md_rspt_ptr) return; - } + rspt = md->md_rspt_ptr; md->md_rspt_ptr = NULL; @@ -2700,7 +2699,6 @@ lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt) * the rspt block. */ LNetInvalidateMDHandle(&rspt->rspt_mdh); - lnet_res_unlock(cpt); } static void @@ -4416,6 +4414,8 @@ lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt, struct lnet_libmd *md, struct lnet_handle_md mdh) { s64 timeout_ns; + bool new_entry = true; + struct lnet_rsp_tracker *local_rspt; /* * MD has a refcount taken by message so it's not going away. @@ -4425,27 +4425,36 @@ lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt, * added to the list. */ - /* debug code */ - LASSERT(md->md_rspt_ptr == NULL); - - /* we'll use that same event in case we never get a response */ - rspt->rspt_mdh = mdh; - rspt->rspt_cpt = cpt; - timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC; - rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns); - lnet_res_lock(cpt); - /* store the rspt so we can access it when we get the REPLY */ - md->md_rspt_ptr = rspt; - lnet_res_unlock(cpt); + local_rspt = md->md_rspt_ptr; + timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC; + if (local_rspt != NULL) { + /* + * we already have an rspt attached to the md, so we'll + * update the deadline on that one. + */ + LIBCFS_FREE(rspt, sizeof(*rspt)); + new_entry = false; + } else { + /* new md */ + rspt->rspt_mdh = mdh; + rspt->rspt_cpt = cpt; + /* store the rspt so we can access it when we get the REPLY */ + md->md_rspt_ptr = rspt; + local_rspt = rspt; + } + local_rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns); /* * add to the list of tracked responses. It's added to tail of the * list in order to expire all the older entries first. */ lnet_net_lock(cpt); - list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]); + if (!new_entry && !list_empty(&local_rspt->rspt_on_list)) + list_del_init(&local_rspt->rspt_on_list); + list_add_tail(&local_rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]); lnet_net_unlock(cpt); + lnet_res_unlock(cpt); } /** @@ -4586,7 +4595,6 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, CNETERR("Error sending PUT to %s: %d\n", libcfs_id2str(target), rc); msg->msg_no_resend = true; - lnet_detach_rsp_tracker(msg->msg_md, cpt); lnet_finalize(msg, rc); } @@ -4800,7 +4808,6 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, CNETERR("Error sending GET to %s: %d\n", libcfs_id2str(target), rc); msg->msg_no_resend = true; - lnet_detach_rsp_tracker(msg->msg_md, cpt); lnet_finalize(msg, rc); } diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 8ad9185..6b0af10 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -363,29 +363,6 @@ lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md, lnet_md_deconstruct(md, &msg->msg_ev.md); } -void -lnet_msg_detach_md(struct lnet_msg *msg, int status) -{ - struct lnet_libmd *md = msg->msg_md; - int unlink; - - /* Now it's safe to drop my caller's ref */ - md->md_refcount--; - LASSERT(md->md_refcount >= 0); - - unlink = lnet_md_unlinkable(md); - if (md->md_eq != NULL) { - msg->msg_ev.status = status; - msg->msg_ev.unlinked = unlink; - lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); - } - - if (unlink) - lnet_md_unlink(md); - - msg->msg_md = NULL; -} - static int lnet_complete_msg_locked(struct lnet_msg *msg, int cpt) { @@ -775,12 +752,43 @@ resend: } static void +lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status) +{ + struct lnet_libmd *md = msg->msg_md; + int unlink; + + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT(md->md_refcount >= 0); + + unlink = lnet_md_unlinkable(md); + if (md->md_eq != NULL) { + msg->msg_ev.status = status; + msg->msg_ev.unlinked = unlink; + lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); + } + + if (unlink) { + /* + * if this is an ACK or a REPLY then make sure to remove the + * response tracker. + */ + if (msg->msg_ev.type == LNET_EVENT_REPLY || + msg->msg_ev.type == LNET_EVENT_ACK) + lnet_detach_rsp_tracker(msg->msg_md, cpt); + lnet_md_unlink(md); + } + + msg->msg_md = NULL; +} + +static void lnet_detach_md(struct lnet_msg *msg, int status) { int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); lnet_res_lock(cpt); - lnet_msg_detach_md(msg, status); + lnet_msg_detach_md(msg, cpt, status); lnet_res_unlock(cpt); } @@ -882,16 +890,6 @@ lnet_finalize(struct lnet_msg *msg, int status) msg->msg_ev.status = status; - /* - * if this is an ACK or a REPLY then make sure to remove the - * response tracker. - */ - if (msg->msg_ev.type == LNET_EVENT_REPLY || - msg->msg_ev.type == LNET_EVENT_ACK) { - cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); - lnet_detach_rsp_tracker(msg->msg_md, cpt); - } - /* if the message is successfully sent, no need to keep the MD around */ if (msg->msg_md != NULL && !status) lnet_detach_md(msg, status);