From ad9243693c9a5a5b2c34165ad853ddf5ceec4617 Mon Sep 17 00:00:00 2001 From: Amir Shehata Date: Sat, 6 Jul 2019 09:02:33 -0700 Subject: [PATCH] LU-12402 lnet: handle recursion in resend When we're resending a message we have to decommit it first. This could potentially result in another message being picked up from the queue and sent, which could fail immediately and be finalized, causing recursion. This problem was observed when a router was being shutdown. This patch uses the same mechanism used in lnet_finalize() to limit recursion. If a thread is already finalizing a message and it gets into path where it starts finalizing a second, then that message is queued and handled later. Signed-off-by: Amir Shehata Change-Id: I0cb943473fc8c22573d98da63a99cf7d678d4f42 Reviewed-on: https://review.whamcloud.com/35431 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Chris Horn Reviewed-by: Alexandr Boyko Reviewed-by: Olaf Weber Reviewed-by: Oleg Drokin --- lnet/include/lnet/lib-types.h | 4 + lnet/lnet/lib-msg.c | 300 ++++++++++++++++++++++++++++-------------- 2 files changed, 202 insertions(+), 102 deletions(-) diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index af591d8..ca93161 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -972,9 +972,13 @@ struct lnet_msg_container { int msc_nfinalizers; /* msgs waiting to complete finalizing */ struct list_head msc_finalizing; + /* msgs waiting to be resent */ + struct list_head msc_resending; struct list_head msc_active; /* active message list */ /* threads doing finalization */ void **msc_finalizers; + /* threads doing resends */ + void **msc_resenders; }; /* Peer Discovery states */ diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 19c9e77..416cf42 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -593,6 +593,174 @@ lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus) } } +static void +lnet_resend_msg_locked(struct lnet_msg *msg) +{ + msg->msg_retry_count++; + + /* + * remove message from the active list and reset it to prepare + * for a resend. Two exceptions to this + * + * 1. the router case. When a message is being routed it is + * committed for rx when received and committed for tx when + * forwarded. We don't want to remove it from the active list, since + * code which handles receiving expects it to remain on the active + * list. + * + * 2. The REPLY case. Reply messages use the same message + * structure for the GET that was received. + */ + if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { + list_del_init(&msg->msg_activelist); + msg->msg_onactivelist = 0; + } + /* + * The msg_target.nid which was originally set + * when calling LNetGet() or LNetPut() might've + * been overwritten if we're routing this message. + * Call lnet_msg_decommit_tx() to return the credit + * this message consumed. The message will + * consume another credit when it gets resent. + */ + msg->msg_target.nid = msg->msg_hdr.dest_nid; + lnet_msg_decommit_tx(msg, -EAGAIN); + msg->msg_sending = 0; + msg->msg_receiving = 0; + msg->msg_target_is_router = 0; + + CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n", + libcfs_nid2str(msg->msg_hdr.src_nid), + libcfs_nid2str(msg->msg_hdr.dest_nid), + lnet_msgtyp2str(msg->msg_type), + lnet_health_error2str(msg->msg_health_status), msg); + + list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); + + wake_up(&the_lnet.ln_mt_waitq); +} + +int +lnet_check_finalize_recursion_locked(struct lnet_msg *msg, + struct list_head *containerq, + int nworkers, void **workers) +{ + int my_slot = -1; + int i; + + list_add_tail(&msg->msg_list, containerq); + + for (i = 0; i < nworkers; i++) { + if (workers[i] == current) + break; + + if (my_slot < 0 && workers[i] == NULL) + my_slot = i; + } + + if (i < nworkers || my_slot < 0) + return -1; + + workers[my_slot] = current; + + return my_slot; +} + +int +lnet_attempt_msg_resend(struct lnet_msg *msg) +{ + struct lnet_msg_container *container; + int my_slot; + int cpt; + + /* we can only resend tx_committed messages */ + LASSERT(msg->msg_tx_committed); + + /* don't resend recovery messages */ + if (msg->msg_recovery) { + CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + /* + * if we explicitly indicated we don't want to resend then just + * return + */ + if (msg->msg_no_resend) { + CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + /* check if the message has exceeded the number of retries */ + if (msg->msg_retry_count >= lnet_retry_count) { + CNETERR("msg %s->%s exceeded retry count %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + cpt = msg->msg_tx_cpt; + lnet_net_lock(cpt); + + /* check again under lock */ + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { + lnet_net_unlock(cpt); + return -ESHUTDOWN; + } + + container = the_lnet.ln_msg_containers[cpt]; + my_slot = + lnet_check_finalize_recursion_locked(msg, + &container->msc_resending, + container->msc_nfinalizers, + container->msc_resenders); + + /* enough threads are resending */ + if (my_slot == -1) { + lnet_net_unlock(cpt); + return 0; + } + + while (!list_empty(&container->msc_resending)) { + msg = list_entry(container->msc_resending.next, + struct lnet_msg, msg_list); + list_del(&msg->msg_list); + + /* + * resending the message will require us to call + * lnet_msg_decommit_tx() which will return the credit + * which this message holds. This could trigger another + * queued message to be sent. If that message fails and + * requires a resend we will recurse. + * But since at this point the slot is taken, the message + * will be queued in the container and dealt with + * later. This breaks the recursion. + */ + lnet_resend_msg_locked(msg); + } + + /* + * msc_resenders is an array of process pointers. Each entry holds + * a pointer to the current process operating on the message. An + * array entry is created per CPT. If the array slot is already + * set, then it means that there is a thread on the CPT currently + * resending a message. + * Once the thread finishes clear the slot to enable the thread to + * take on more resend work. + */ + container->msc_resenders[my_slot] = NULL; + lnet_net_unlock(cpt); + + return 0; +} + /* * Do a health check on the message: * return -1 if we're not going to handle the error or @@ -604,9 +772,9 @@ static int lnet_health_check(struct lnet_msg *msg) { enum lnet_msg_hstatus hstatus = msg->msg_health_status; - bool lo = false; - struct lnet_ni *ni; struct lnet_peer_ni *lpni; + struct lnet_ni *ni; + bool lo = false; /* if we're shutting down no point in handling health. */ if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) @@ -700,7 +868,7 @@ lnet_health_check(struct lnet_msg *msg) lnet_handle_local_failure(ni); if (msg->msg_tx_committed) /* add to the re-send queue */ - goto resend; + return lnet_attempt_msg_resend(msg); break; /* @@ -718,7 +886,7 @@ lnet_health_check(struct lnet_msg *msg) case LNET_MSG_STATUS_REMOTE_DROPPED: lnet_handle_remote_failure(lpni); if (msg->msg_tx_committed) - goto resend; + return lnet_attempt_msg_resend(msg); break; case LNET_MSG_STATUS_REMOTE_ERROR: @@ -730,89 +898,8 @@ lnet_health_check(struct lnet_msg *msg) LBUG(); } -resend: - /* we can only resend tx_committed messages */ - LASSERT(msg->msg_tx_committed); - - /* don't resend recovery messages */ - if (msg->msg_recovery) { - CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), - msg->msg_retry_count); - return -1; - } - - /* - * if we explicitly indicated we don't want to resend then just - * return - */ - if (msg->msg_no_resend) { - CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), - msg->msg_retry_count); - return -1; - } - - /* check if the message has exceeded the number of retries */ - if (msg->msg_retry_count >= lnet_retry_count) { - CNETERR("msg %s->%s exceeded retry count %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), - msg->msg_retry_count); - return -1; - } - msg->msg_retry_count++; - - lnet_net_lock(msg->msg_tx_cpt); - - /* check again under lock */ - if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { - lnet_net_unlock(msg->msg_tx_cpt); - return -1; - } - - /* - * remove message from the active list and reset it in preparation - * for a resend. Two exception to this - * - * 1. the router case, whe a message is committed for rx when - * received, then tx when it is sent. When committed to both tx and - * rx we don't want to remove it from the active list. - * - * 2. The REPLY case since it uses the same msg block for the GET - * that was received. - */ - if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { - list_del_init(&msg->msg_activelist); - msg->msg_onactivelist = 0; - } - /* - * The msg_target.nid which was originally set - * when calling LNetGet() or LNetPut() might've - * been overwritten if we're routing this message. - * Call lnet_return_tx_credits_locked() to return - * the credit this message consumed. The message will - * consume another credit when it gets resent. - */ - msg->msg_target.nid = msg->msg_hdr.dest_nid; - lnet_msg_decommit_tx(msg, -EAGAIN); - msg->msg_sending = 0; - msg->msg_receiving = 0; - msg->msg_target_is_router = 0; - - CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n", - libcfs_nid2str(msg->msg_hdr.src_nid), - libcfs_nid2str(msg->msg_hdr.dest_nid), - lnet_msgtyp2str(msg->msg_type), - lnet_health_error2str(hstatus)); - - list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); - lnet_net_unlock(msg->msg_tx_cpt); - - wake_up(&the_lnet.ln_mt_waitq); - return 0; + /* no resend is needed */ + return -1; } static void @@ -951,7 +1038,6 @@ lnet_finalize(struct lnet_msg *msg, int status) int my_slot; int cpt; int rc; - int i; LASSERT(!in_interrupt()); @@ -974,7 +1060,6 @@ lnet_finalize(struct lnet_msg *msg, int status) * put on the resend queue. */ if (!lnet_health_check(msg)) - /* Message is queued for resend */ return; } @@ -1006,27 +1091,20 @@ again: lnet_net_lock(cpt); container = the_lnet.ln_msg_containers[cpt]; - list_add_tail(&msg->msg_list, &container->msc_finalizing); /* Recursion breaker. Don't complete the message here if I am (or * enough other threads are) already completing messages */ + my_slot = lnet_check_finalize_recursion_locked(msg, + &container->msc_finalizing, + container->msc_nfinalizers, + container->msc_finalizers); - my_slot = -1; - for (i = 0; i < container->msc_nfinalizers; i++) { - if (container->msc_finalizers[i] == current) - break; - - if (my_slot < 0 && container->msc_finalizers[i] == NULL) - my_slot = i; - } - - if (i < container->msc_nfinalizers || my_slot < 0) { + /* enough threads are resending */ + if (my_slot == -1) { lnet_net_unlock(cpt); return; } - container->msc_finalizers[my_slot] = current; - rc = 0; while (!list_empty(&container->msc_finalizing)) { msg = list_entry(container->msc_finalizing.next, @@ -1084,6 +1162,13 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) sizeof(*container->msc_finalizers)); container->msc_finalizers = NULL; } + + if (container->msc_resenders != NULL) { + LIBCFS_FREE(container->msc_resenders, + container->msc_nfinalizers * + sizeof(*container->msc_resenders)); + container->msc_resenders = NULL; + } container->msc_init = 0; } @@ -1096,6 +1181,7 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) INIT_LIST_HEAD(&container->msc_active); INIT_LIST_HEAD(&container->msc_finalizing); + INIT_LIST_HEAD(&container->msc_resending); /* number of CPUs */ container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); @@ -1112,6 +1198,16 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) return -ENOMEM; } + LIBCFS_CPT_ALLOC(container->msc_resenders, lnet_cpt_table(), cpt, + container->msc_nfinalizers * + sizeof(*container->msc_resenders)); + + if (container->msc_resenders == NULL) { + CERROR("Failed to allocate message resenders\n"); + lnet_msg_container_cleanup(container); + return -ENOMEM; + } + return rc; } -- 1.8.3.1