From: Amir Shehata Date: Fri, 29 Jun 2018 01:02:42 +0000 (-0700) Subject: LU-9120 lnet: timeout delayed REPLYs and ACKs X-Git-Tag: 2.11.55~65^2^2~15 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=refs%2Fchanges%2F71%2F32771%2F15;p=fs%2Flustre-release.git LU-9120 lnet: timeout delayed REPLYs and ACKs When a GET or a PUT which require an ACK are sent, add a response tracker block on a percpt queue. When the REPLY/ACK are received then remove the block from the percpt queue. The monitor thread will wake up periodically to check if any of the blocks have expired and if so, it will send a timeout event to the ULP and flag the MD as stale, then unlink. Test-Parameters: forbuildonly Signed-off-by: Amir Shehata Change-Id: Ia219fca5a578d625819b9f9c8ee2b3aa050dce80 Reviewed-on: https://review.whamcloud.com/32771 Tested-by: Jenkins Reviewed-by: Olaf Weber Reviewed-by: Sonia Sharma --- diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 0af6d32..a6799fa 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -490,6 +490,24 @@ lnet_msg_free(struct lnet_msg *msg) LIBCFS_FREE(msg, sizeof(*msg)); } +static inline struct lnet_rsp_tracker * +lnet_rspt_alloc(int cpt) +{ + struct lnet_rsp_tracker *rspt; + LIBCFS_ALLOC(rspt, sizeof(*rspt)); + lnet_net_lock(cpt); + lnet_net_unlock(cpt); + return rspt; +} + +static inline void +lnet_rspt_free(struct lnet_rsp_tracker *rspt, int cpt) +{ + LIBCFS_FREE(rspt, sizeof(*rspt)); + lnet_net_lock(cpt); + lnet_net_unlock(cpt); +} + void lnet_ni_free(struct lnet_ni *ni); void lnet_net_free(struct lnet_net *net); @@ -682,6 +700,7 @@ struct lnet_msg *lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *get_msg); void lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *msg, unsigned int len); +void lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt); void lnet_finalize(struct lnet_msg *msg, int rc); diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 3c36ebf..e921595 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -78,6 +78,17 @@ enum lnet_msg_hstatus { LNET_MSG_STATUS_NETWORK_TIMEOUT }; +struct lnet_rsp_tracker { + /* chain on the waiting list */ + struct list_head rspt_on_list; + /* cpt to lock */ + int rspt_cpt; + /* deadline of the REPLY/ACK */ + ktime_t rspt_deadline; + /* parent MD */ + struct lnet_handle_md rspt_mdh; +}; + struct lnet_msg { struct list_head msg_activelist; struct list_head msg_list; /* Q for credits/MD */ @@ -191,24 +202,25 @@ struct lnet_me { }; struct lnet_libmd { - struct list_head md_list; - struct lnet_libhandle md_lh; - struct lnet_me *md_me; - char *md_start; - unsigned int md_offset; - unsigned int md_length; - unsigned int md_max_size; - int md_threshold; - int md_refcount; - unsigned int md_options; - unsigned int md_flags; - unsigned int md_niov; /* # frags at end of struct */ - void *md_user_ptr; - struct lnet_eq *md_eq; - struct lnet_handle_md md_bulk_handle; + struct list_head md_list; + struct lnet_libhandle md_lh; + struct lnet_me *md_me; + char *md_start; + unsigned int md_offset; + unsigned int md_length; + unsigned int md_max_size; + int md_threshold; + int md_refcount; + unsigned int md_options; + unsigned int md_flags; + unsigned int md_niov; /* # frags at end of struct */ + void *md_user_ptr; + struct lnet_rsp_tracker *md_rspt_ptr; + struct lnet_eq *md_eq; + struct lnet_handle_md md_bulk_handle; union { - struct kvec iov[LNET_MAX_IOV]; - lnet_kiov_t kiov[LNET_MAX_IOV]; + struct kvec iov[LNET_MAX_IOV]; + lnet_kiov_t kiov[LNET_MAX_IOV]; } md_iov; }; @@ -1085,6 +1097,14 @@ struct lnet { struct list_head ln_mt_localNIRecovq; /* local NIs to recover */ struct list_head ln_mt_peerNIRecovq; + /* + * An array of queues for GET/PUT waiting for REPLY/ACK respectively. + * There are CPT number of queues. Since response trackers will be + * added on the fast path we can't afford to grab the exclusive + * net lock to protect these queues. The CPT will be calculated + * based on the mdh cookie. + */ + struct list_head **ln_mt_rstq; /* recovery eq handler */ struct lnet_handle_eq ln_mt_eqh; diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 12ac396..b960864 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -2653,6 +2653,110 @@ struct lnet_mt_event_info { lnet_nid_t mt_nid; }; +void +lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt) +{ + struct lnet_rsp_tracker *rspt; + + /* + * msg has a refcount on the MD so the MD is not going away. + * The rspt queue for the cpt is protected by + * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie. + */ + lnet_res_lock(cpt); + if (!md->md_rspt_ptr) { + lnet_res_unlock(cpt); + return; + } + rspt = md->md_rspt_ptr; + md->md_rspt_ptr = NULL; + + /* debug code */ + LASSERT(rspt->rspt_cpt == cpt); + + /* + * invalidate the handle to indicate that a response has been + * received, which will then lead the monitor thread to clean up + * the rspt block. + */ + LNetInvalidateMDHandle(&rspt->rspt_mdh); + lnet_res_unlock(cpt); +} + +static void +lnet_finalize_expired_responses(bool force) +{ + struct lnet_libmd *md; + struct list_head local_queue; + struct lnet_rsp_tracker *rspt, *tmp; + int i; + + if (the_lnet.ln_mt_rstq == NULL) + return; + + cfs_cpt_for_each(i, lnet_cpt_table()) { + INIT_LIST_HEAD(&local_queue); + + lnet_net_lock(i); + if (!the_lnet.ln_mt_rstq[i]) { + lnet_net_unlock(i); + continue; + } + list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue); + lnet_net_unlock(i); + + list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) { + /* + * The rspt mdh will be invalidated when a response + * is received or whenever we want to discard the + * block the monitor thread will walk the queue + * and clean up any rsts with an invalid mdh. + * The monitor thread will walk the queue until + * the first unexpired rspt block. This means that + * some rspt blocks which received their + * corresponding responses will linger in the + * queue until they are cleaned up eventually. + */ + lnet_res_lock(i); + if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) { + lnet_res_unlock(i); + list_del_init(&rspt->rspt_on_list); + lnet_rspt_free(rspt, i); + continue; + } + + if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 || + force) { + md = lnet_handle2md(&rspt->rspt_mdh); + if (!md) { + LNetInvalidateMDHandle(&rspt->rspt_mdh); + lnet_res_unlock(i); + list_del_init(&rspt->rspt_on_list); + lnet_rspt_free(rspt, i); + continue; + } + LASSERT(md->md_rspt_ptr == rspt); + md->md_rspt_ptr = NULL; + lnet_res_unlock(i); + + list_del_init(&rspt->rspt_on_list); + + CDEBUG(D_NET, "Response timed out: md = %p\n", md); + LNetMDUnlink(rspt->rspt_mdh); + lnet_rspt_free(rspt, i); + } else { + lnet_res_unlock(i); + break; + } + } + + lnet_net_lock(i); + if (!list_empty(&local_queue)) + list_splice(&local_queue, the_lnet.ln_mt_rstq[i]); + lnet_net_unlock(i); + } +} + static void lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt) { @@ -3146,6 +3250,8 @@ lnet_recover_peer_nis(void) static int lnet_monitor_thread(void *arg) { + int wakeup_counter = 0; + /* * The monitor thread takes care of the following: * 1. Checks the aliveness of routers @@ -3164,6 +3270,12 @@ lnet_monitor_thread(void *arg) lnet_resend_pending_msgs(); + wakeup_counter++; + if (wakeup_counter >= lnet_transaction_timeout / 2) { + lnet_finalize_expired_responses(false); + wakeup_counter = 0; + } + lnet_recover_local_nis(); lnet_recover_peer_nis(); @@ -3348,6 +3460,29 @@ lnet_mt_event_handler(struct lnet_event *event) } } +static int +lnet_rsp_tracker_create(void) +{ + struct list_head **rstqs; + rstqs = lnet_create_array_of_queues(); + + if (!rstqs) + return -ENOMEM; + + the_lnet.ln_mt_rstq = rstqs; + + return 0; +} + +static void +lnet_rsp_tracker_clean(void) +{ + lnet_finalize_expired_responses(true); + + cfs_percpt_free(the_lnet.ln_mt_rstq); + the_lnet.ln_mt_rstq = NULL; +} + int lnet_monitor_thr_start(void) { int rc = 0; @@ -3360,6 +3495,10 @@ int lnet_monitor_thr_start(void) if (rc) return rc; + rc = lnet_rsp_tracker_create(); + if (rc) + goto clean_queues; + rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh); if (rc != 0) { CERROR("Can't allocate monitor thread EQ: %d\n", rc); @@ -3394,6 +3533,7 @@ clean_thread: lnet_router_cleanup(); free_mem: the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; + lnet_rsp_tracker_clean(); lnet_clean_local_ni_recoveryq(); lnet_clean_peer_ni_recoveryq(); lnet_clean_resendqs(); @@ -3401,6 +3541,7 @@ free_mem: LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh); return rc; clean_queues: + lnet_rsp_tracker_clean(); lnet_clean_local_ni_recoveryq(); lnet_clean_peer_ni_recoveryq(); lnet_clean_resendqs(); @@ -3426,6 +3567,7 @@ void lnet_monitor_thr_stop(void) /* perform cleanup tasks */ lnet_router_cleanup(); + lnet_rsp_tracker_clean(); lnet_clean_local_ni_recoveryq(); lnet_clean_peer_ni_recoveryq(); lnet_clean_resendqs(); @@ -3598,13 +3740,13 @@ lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get) static int lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg) { - void *private = msg->msg_private; - struct lnet_hdr *hdr = &msg->msg_hdr; + void *private = msg->msg_private; + struct lnet_hdr *hdr = &msg->msg_hdr; struct lnet_process_id src = {0}; - struct lnet_libmd *md; - int rlength; - int mlength; - int cpt; + struct lnet_libmd *md; + int rlength; + int mlength; + int cpt; cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie); lnet_res_lock(cpt); @@ -3665,10 +3807,10 @@ lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg) static int lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg) { - struct lnet_hdr *hdr = &msg->msg_hdr; + struct lnet_hdr *hdr = &msg->msg_hdr; struct lnet_process_id src = {0}; - struct lnet_libmd *md; - int cpt; + struct lnet_libmd *md; + int cpt; src.nid = hdr->src_nid; src.pid = hdr->src_pid; @@ -4178,6 +4320,43 @@ lnet_recv_delayed_msg_list(struct list_head *head) } } +static void +lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt, + struct lnet_libmd *md, struct lnet_handle_md mdh) +{ + s64 timeout_ns; + + /* + * MD has a refcount taken by message so it's not going away. + * The MD however can be looked up. We need to secure the access + * to the md_rspt_ptr by taking the res_lock. + * The rspt can be accessed without protection up to when it gets + * added to the list. + */ + + /* debug code */ + LASSERT(md->md_rspt_ptr == NULL); + + /* we'll use that same event in case we never get a response */ + rspt->rspt_mdh = mdh; + rspt->rspt_cpt = cpt; + timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC; + rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns); + + lnet_res_lock(cpt); + /* store the rspt so we can access it when we get the REPLY */ + md->md_rspt_ptr = rspt; + lnet_res_unlock(cpt); + + /* + * add to the list of tracked responses. It's added to tail of the + * list in order to expire all the older entries first. + */ + lnet_net_lock(cpt); + list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]); + lnet_net_unlock(cpt); +} + /** * Initiate an asynchronous PUT operation. * @@ -4228,10 +4407,11 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, __u64 match_bits, unsigned int offset, __u64 hdr_data) { - struct lnet_msg *msg; - struct lnet_libmd *md; - int cpt; - int rc; + struct lnet_msg *msg; + struct lnet_libmd *md; + int cpt; + int rc; + struct lnet_rsp_tracker *rspt = NULL; LASSERT(the_lnet.ln_refcount > 0); @@ -4251,6 +4431,17 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, msg->msg_vmflush = !!memory_pressure_get(); cpt = lnet_cpt_of_cookie(mdh.cookie); + + if (ack == LNET_ACK_REQ) { + rspt = lnet_rspt_alloc(cpt); + if (!rspt) { + CERROR("Dropping PUT to %s: ENOMEM on response tracker\n", + libcfs_id2str(target)); + return -ENOMEM; + } + INIT_LIST_HEAD(&rspt->rspt_on_list); + } + lnet_res_lock(cpt); md = lnet_handle2md(&mdh); @@ -4263,6 +4454,7 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, md->md_me->me_portal); lnet_res_unlock(cpt); + LIBCFS_FREE(rspt, sizeof(*rspt)); lnet_msg_free(msg); return -ENOENT; } @@ -4295,11 +4487,15 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, lnet_build_msg_event(msg, LNET_EVENT_SEND); + if (ack == LNET_ACK_REQ) + lnet_attach_rsp_tracker(rspt, cpt, md, mdh); + rc = lnet_send(self, msg, LNET_NID_ANY); if (rc != 0) { CNETERR("Error sending PUT to %s: %d\n", libcfs_id2str(target), rc); msg->msg_no_resend = true; + lnet_detach_rsp_tracker(msg->msg_md, cpt); lnet_finalize(msg, rc); } @@ -4431,10 +4627,11 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, struct lnet_process_id target, unsigned int portal, __u64 match_bits, unsigned int offset, bool recovery) { - struct lnet_msg *msg; - struct lnet_libmd *md; - int cpt; - int rc; + struct lnet_msg *msg; + struct lnet_libmd *md; + struct lnet_rsp_tracker *rspt; + int cpt; + int rc; LASSERT(the_lnet.ln_refcount > 0); @@ -4447,15 +4644,24 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, } msg = lnet_msg_alloc(); - if (msg == NULL) { + if (!msg) { CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n", libcfs_id2str(target)); return -ENOMEM; } + cpt = lnet_cpt_of_cookie(mdh.cookie); + + rspt = lnet_rspt_alloc(cpt); + if (!rspt) { + CERROR("Dropping GET to %s: ENOMEM on response tracker\n", + libcfs_id2str(target)); + return -ENOMEM; + } + INIT_LIST_HEAD(&rspt->rspt_on_list); + msg->msg_recovery = recovery; - cpt = lnet_cpt_of_cookie(mdh.cookie); lnet_res_lock(cpt); md = lnet_handle2md(&mdh); @@ -4470,6 +4676,7 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, lnet_res_unlock(cpt); lnet_msg_free(msg); + LIBCFS_FREE(rspt, sizeof(*rspt)); return -ENOENT; } @@ -4494,11 +4701,14 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, lnet_build_msg_event(msg, LNET_EVENT_SEND); + lnet_attach_rsp_tracker(rspt, cpt, md, mdh); + rc = lnet_send(self, msg, LNET_NID_ANY); if (rc < 0) { CNETERR("Error sending GET to %s: %d\n", libcfs_id2str(target), rc); msg->msg_no_resend = true; + lnet_detach_rsp_tracker(msg->msg_md, cpt); lnet_finalize(msg, rc); } diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 8de5211..fb87c50 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -780,6 +780,16 @@ lnet_finalize(struct lnet_msg *msg, int status) msg->msg_ev.status = status; + /* + * if this is an ACK or a REPLY then make sure to remove the + * response tracker. + */ + if (msg->msg_ev.type == LNET_EVENT_REPLY || + msg->msg_ev.type == LNET_EVENT_ACK) { + cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + lnet_detach_rsp_tracker(msg->msg_md, cpt); + } + /* if the message is successfully sent, no need to keep the MD around */ if (msg->msg_md != NULL && !status) lnet_detach_md(msg, status);