Whamcloud - gitweb
LU-11734 lnet: handle multi-md usage 94/33794/2
authorAmir Shehata <ashehata@whamcloud.com>
Wed, 5 Dec 2018 21:57:11 +0000 (13:57 -0800)
committerOleg Drokin <green@whamcloud.com>
Sat, 8 Dec 2018 05:33:54 +0000 (05:33 +0000)
The MD can be used multiple times. The response tracker needs to have
the same lifespan as the MD. If we re-use the MD and a response
tracker has already been attached to it, then we'll update the
deadline for the response tracker. This means the deadline on the MD
is for its last user.

Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: I681630c3d599f66c007926525708e3004b343455
Reviewed-on: https://review.whamcloud.com/33794
Tested-by: Jenkins
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Olaf Weber <olaf.weber@hpe.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c

index 0299a21..c99d49b 100644 (file)
@@ -604,7 +604,6 @@ int lnet_islocalnet(__u32 net);
 
 void lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
                        unsigned int offset, unsigned int mlen);
 
 void lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
                        unsigned int offset, unsigned int mlen);
-void lnet_msg_detach_md(struct lnet_msg *msg, int status);
 void lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev);
 void lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type);
 void lnet_msg_commit(struct lnet_msg *msg, int cpt);
 void lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev);
 void lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type);
 void lnet_msg_commit(struct lnet_msg *msg, int cpt);
index e217e95..6d2ac7a 100644 (file)
@@ -2673,6 +2673,7 @@ struct lnet_mt_event_info {
        lnet_nid_t mt_nid;
 };
 
        lnet_nid_t mt_nid;
 };
 
+/* called with res_lock held */
 void
 lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
 {
 void
 lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
 {
@@ -2683,11 +2684,9 @@ lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
         * The rspt queue for the cpt is protected by
         * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
         */
         * The rspt queue for the cpt is protected by
         * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
         */
-       lnet_res_lock(cpt);
-       if (!md->md_rspt_ptr) {
-               lnet_res_unlock(cpt);
+       if (!md->md_rspt_ptr)
                return;
                return;
-       }
+
        rspt = md->md_rspt_ptr;
        md->md_rspt_ptr = NULL;
 
        rspt = md->md_rspt_ptr;
        md->md_rspt_ptr = NULL;
 
@@ -2700,7 +2699,6 @@ lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
         * the rspt block.
         */
        LNetInvalidateMDHandle(&rspt->rspt_mdh);
         * the rspt block.
         */
        LNetInvalidateMDHandle(&rspt->rspt_mdh);
-       lnet_res_unlock(cpt);
 }
 
 static void
 }
 
 static void
@@ -4416,6 +4414,8 @@ lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
                        struct lnet_libmd *md, struct lnet_handle_md mdh)
 {
        s64 timeout_ns;
                        struct lnet_libmd *md, struct lnet_handle_md mdh)
 {
        s64 timeout_ns;
+       bool new_entry = true;
+       struct lnet_rsp_tracker *local_rspt;
 
        /*
         * MD has a refcount taken by message so it's not going away.
 
        /*
         * MD has a refcount taken by message so it's not going away.
@@ -4425,27 +4425,36 @@ lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
         * added to the list.
         */
 
         * added to the list.
         */
 
-       /* debug code */
-       LASSERT(md->md_rspt_ptr == NULL);
-
-       /* we'll use that same event in case we never get a response  */
-       rspt->rspt_mdh = mdh;
-       rspt->rspt_cpt = cpt;
-       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
-       rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
-
        lnet_res_lock(cpt);
        lnet_res_lock(cpt);
-       /* store the rspt so we can access it when we get the REPLY */
-       md->md_rspt_ptr = rspt;
-       lnet_res_unlock(cpt);
+       local_rspt = md->md_rspt_ptr;
+       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+       if (local_rspt != NULL) {
+               /*
+                * we already have an rspt attached to the md, so we'll
+                * update the deadline on that one.
+                */
+               LIBCFS_FREE(rspt, sizeof(*rspt));
+               new_entry = false;
+       } else {
+               /* new md */
+               rspt->rspt_mdh = mdh;
+               rspt->rspt_cpt = cpt;
+               /* store the rspt so we can access it when we get the REPLY */
+               md->md_rspt_ptr = rspt;
+               local_rspt = rspt;
+       }
+       local_rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
 
        /*
         * add to the list of tracked responses. It's added to tail of the
         * list in order to expire all the older entries first.
         */
        lnet_net_lock(cpt);
 
        /*
         * add to the list of tracked responses. It's added to tail of the
         * list in order to expire all the older entries first.
         */
        lnet_net_lock(cpt);
-       list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
+       if (!new_entry && !list_empty(&local_rspt->rspt_on_list))
+               list_del_init(&local_rspt->rspt_on_list);
+       list_add_tail(&local_rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
        lnet_net_unlock(cpt);
        lnet_net_unlock(cpt);
+       lnet_res_unlock(cpt);
 }
 
 /**
 }
 
 /**
@@ -4586,7 +4595,6 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
                CNETERR("Error sending PUT to %s: %d\n",
                        libcfs_id2str(target), rc);
                msg->msg_no_resend = true;
                CNETERR("Error sending PUT to %s: %d\n",
                        libcfs_id2str(target), rc);
                msg->msg_no_resend = true;
-               lnet_detach_rsp_tracker(msg->msg_md, cpt);
                lnet_finalize(msg, rc);
        }
 
                lnet_finalize(msg, rc);
        }
 
@@ -4800,7 +4808,6 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
                CNETERR("Error sending GET to %s: %d\n",
                        libcfs_id2str(target), rc);
                msg->msg_no_resend = true;
                CNETERR("Error sending GET to %s: %d\n",
                        libcfs_id2str(target), rc);
                msg->msg_no_resend = true;
-               lnet_detach_rsp_tracker(msg->msg_md, cpt);
                lnet_finalize(msg, rc);
        }
 
                lnet_finalize(msg, rc);
        }
 
index 8ad9185..6b0af10 100644 (file)
@@ -363,29 +363,6 @@ lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
        lnet_md_deconstruct(md, &msg->msg_ev.md);
 }
 
        lnet_md_deconstruct(md, &msg->msg_ev.md);
 }
 
-void
-lnet_msg_detach_md(struct lnet_msg *msg, int status)
-{
-       struct lnet_libmd *md = msg->msg_md;
-       int unlink;
-
-       /* Now it's safe to drop my caller's ref */
-       md->md_refcount--;
-       LASSERT(md->md_refcount >= 0);
-
-       unlink = lnet_md_unlinkable(md);
-       if (md->md_eq != NULL) {
-               msg->msg_ev.status   = status;
-               msg->msg_ev.unlinked = unlink;
-               lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
-       }
-
-       if (unlink)
-               lnet_md_unlink(md);
-
-       msg->msg_md = NULL;
-}
-
 static int
 lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
 {
 static int
 lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
 {
@@ -775,12 +752,43 @@ resend:
 }
 
 static void
 }
 
 static void
+lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
+{
+       struct lnet_libmd *md = msg->msg_md;
+       int unlink;
+
+       /* Now it's safe to drop my caller's ref */
+       md->md_refcount--;
+       LASSERT(md->md_refcount >= 0);
+
+       unlink = lnet_md_unlinkable(md);
+       if (md->md_eq != NULL) {
+               msg->msg_ev.status   = status;
+               msg->msg_ev.unlinked = unlink;
+               lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
+       }
+
+       if (unlink) {
+               /*
+                * if this is an ACK or a REPLY then make sure to remove the
+                * response tracker.
+                */
+               if (msg->msg_ev.type == LNET_EVENT_REPLY ||
+                   msg->msg_ev.type == LNET_EVENT_ACK)
+                       lnet_detach_rsp_tracker(msg->msg_md, cpt);
+               lnet_md_unlink(md);
+       }
+
+       msg->msg_md = NULL;
+}
+
+static void
 lnet_detach_md(struct lnet_msg *msg, int status)
 {
        int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
 
        lnet_res_lock(cpt);
 lnet_detach_md(struct lnet_msg *msg, int status)
 {
        int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
 
        lnet_res_lock(cpt);
-       lnet_msg_detach_md(msg, status);
+       lnet_msg_detach_md(msg, cpt, status);
        lnet_res_unlock(cpt);
 }
 
        lnet_res_unlock(cpt);
 }
 
@@ -882,16 +890,6 @@ lnet_finalize(struct lnet_msg *msg, int status)
 
        msg->msg_ev.status = status;
 
 
        msg->msg_ev.status = status;
 
-       /*
-        * if this is an ACK or a REPLY then make sure to remove the
-        * response tracker.
-        */
-       if (msg->msg_ev.type == LNET_EVENT_REPLY ||
-           msg->msg_ev.type == LNET_EVENT_ACK) {
-               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
-               lnet_detach_rsp_tracker(msg->msg_md, cpt);
-       }
-
        /* if the message is successfully sent, no need to keep the MD around */
        if (msg->msg_md != NULL && !status)
                lnet_detach_md(msg, status);
        /* if the message is successfully sent, no need to keep the MD around */
        if (msg->msg_md != NULL && !status)
                lnet_detach_md(msg, status);