From 41ed1c18082435624dc5a391511a5ff40ec79979 Mon Sep 17 00:00:00 2001 From: Amir Shehata Date: Sat, 25 Apr 2020 10:19:32 -0700 Subject: [PATCH] LU-12402 lnet: handle recursion in resend When we're resending a message we have to decommit it first. This could potentially result in another message being picked up from the queue and sent, which could fail immediately and be finalized, causing recursion. This problem was observed when a router was being shutdown. This patch uses the same mechanism used in lnet_finalize() to limit recursion. If a thread is already finalizing a message and it gets into path where it starts finalizing a second, then that message is queued and handled later. Lustre-change: https://review.whamcloud.com/35431 Lustre-commit: ad9243693c9a5a5b2c34165ad853ddf5ceec4617 Signed-off-by: Amir Shehata Change-Id: Iace64c7ddb1f56a0a63b030df6a5ab103ae6c645 Reviewed-on: https://review.whamcloud.com/38367 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Serguei Smirnov Reviewed-by: Oleg Drokin --- lnet/include/lnet/lib-types.h | 4 + lnet/lnet/lib-msg.c | 293 ++++++++++++++++++++++++++++-------------- 2 files changed, 200 insertions(+), 97 deletions(-) diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 8771481..c653167 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -967,9 +967,13 @@ struct lnet_msg_container { int msc_nfinalizers; /* msgs waiting to complete finalizing */ struct list_head msc_finalizing; + /* msgs waiting to be resent */ + struct list_head msc_resending; struct list_head msc_active; /* active message list */ /* threads doing finalization */ void **msc_finalizers; + /* threads doing resends */ + void **msc_resenders; }; /* Peer Discovery states */ diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 55b8d80..8822d15 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -586,6 +586,174 @@ lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus) } } +static void +lnet_resend_msg_locked(struct lnet_msg *msg) +{ + msg->msg_retry_count++; + + /* + * remove message from the active list and reset it to prepare + * for a resend. Two exceptions to this + * + * 1. the router case. When a message is being routed it is + * committed for rx when received and committed for tx when + * forwarded. We don't want to remove it from the active list, since + * code which handles receiving expects it to remain on the active + * list. + * + * 2. The REPLY case. Reply messages use the same message + * structure for the GET that was received. + */ + if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { + list_del_init(&msg->msg_activelist); + msg->msg_onactivelist = 0; + } + /* + * The msg_target.nid which was originally set + * when calling LNetGet() or LNetPut() might've + * been overwritten if we're routing this message. + * Call lnet_msg_decommit_tx() to return the credit + * this message consumed. The message will + * consume another credit when it gets resent. + */ + msg->msg_target.nid = msg->msg_hdr.dest_nid; + lnet_msg_decommit_tx(msg, -EAGAIN); + msg->msg_sending = 0; + msg->msg_receiving = 0; + msg->msg_target_is_router = 0; + + CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n", + libcfs_nid2str(msg->msg_hdr.src_nid), + libcfs_nid2str(msg->msg_hdr.dest_nid), + lnet_msgtyp2str(msg->msg_type), + lnet_health_error2str(msg->msg_health_status), msg); + + list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); + + wake_up(&the_lnet.ln_mt_waitq); +} + +int +lnet_check_finalize_recursion_locked(struct lnet_msg *msg, + struct list_head *containerq, + int nworkers, void **workers) +{ + int my_slot = -1; + int i; + + list_add_tail(&msg->msg_list, containerq); + + for (i = 0; i < nworkers; i++) { + if (workers[i] == current) + break; + + if (my_slot < 0 && workers[i] == NULL) + my_slot = i; + } + + if (i < nworkers || my_slot < 0) + return -1; + + workers[my_slot] = current; + + return my_slot; +} + +int +lnet_attempt_msg_resend(struct lnet_msg *msg) +{ + struct lnet_msg_container *container; + int my_slot; + int cpt; + + /* we can only resend tx_committed messages */ + LASSERT(msg->msg_tx_committed); + + /* don't resend recovery messages */ + if (msg->msg_recovery) { + CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + /* + * if we explicitly indicated we don't want to resend then just + * return + */ + if (msg->msg_no_resend) { + CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + /* check if the message has exceeded the number of retries */ + if (msg->msg_retry_count >= lnet_retry_count) { + CNETERR("msg %s->%s exceeded retry count %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + cpt = msg->msg_tx_cpt; + lnet_net_lock(cpt); + + /* check again under lock */ + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { + lnet_net_unlock(cpt); + return -ESHUTDOWN; + } + + container = the_lnet.ln_msg_containers[cpt]; + my_slot = + lnet_check_finalize_recursion_locked(msg, + &container->msc_resending, + container->msc_nfinalizers, + container->msc_resenders); + + /* enough threads are resending */ + if (my_slot == -1) { + lnet_net_unlock(cpt); + return 0; + } + + while (!list_empty(&container->msc_resending)) { + msg = list_entry(container->msc_resending.next, + struct lnet_msg, msg_list); + list_del(&msg->msg_list); + + /* + * resending the message will require us to call + * lnet_msg_decommit_tx() which will return the credit + * which this message holds. This could trigger another + * queued message to be sent. If that message fails and + * requires a resend we will recurse. + * But since at this point the slot is taken, the message + * will be queued in the container and dealt with + * later. This breaks the recursion. + */ + lnet_resend_msg_locked(msg); + } + + /* + * msc_resenders is an array of process pointers. Each entry holds + * a pointer to the current process operating on the message. An + * array entry is created per CPT. If the array slot is already + * set, then it means that there is a thread on the CPT currently + * resending a message. + * Once the thread finishes clear the slot to enable the thread to + * take on more resend work. + */ + container->msc_resenders[my_slot] = NULL; + lnet_net_unlock(cpt); + + return 0; +} + /* * Do a health check on the message: * return -1 if we're not going to handle the error or @@ -653,7 +821,7 @@ lnet_health_check(struct lnet_msg *msg) case LNET_MSG_STATUS_LOCAL_TIMEOUT: lnet_handle_local_failure(msg); /* add to the re-send queue */ - goto resend; + return lnet_attempt_msg_resend(msg); /* * These errors will not trigger a resend so simply @@ -669,7 +837,7 @@ lnet_health_check(struct lnet_msg *msg) */ case LNET_MSG_STATUS_REMOTE_DROPPED: lnet_handle_remote_failure(msg->msg_txpeer); - goto resend; + return lnet_attempt_msg_resend(msg); case LNET_MSG_STATUS_REMOTE_ERROR: case LNET_MSG_STATUS_REMOTE_TIMEOUT: @@ -680,86 +848,8 @@ lnet_health_check(struct lnet_msg *msg) LBUG(); } -resend: - /* don't resend recovery messages */ - if (msg->msg_recovery) { - CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), - msg->msg_retry_count); - return -1; - } - - /* - * if we explicitly indicated we don't want to resend then just - * return - */ - if (msg->msg_no_resend) { - CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), - msg->msg_retry_count); - return -1; - } - - /* check if the message has exceeded the number of retries */ - if (msg->msg_retry_count >= lnet_retry_count) { - CNETERR("msg %s->%s exceeded retry count %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), - msg->msg_retry_count); - return -1; - } - msg->msg_retry_count++; - - lnet_net_lock(msg->msg_tx_cpt); - - /* check again under lock */ - if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { - lnet_net_unlock(msg->msg_tx_cpt); - return -1; - } - - /* - * remove message from the active list and reset it in preparation - * for a resend. Two exception to this - * - * 1. the router case, whe a message is committed for rx when - * received, then tx when it is sent. When committed to both tx and - * rx we don't want to remove it from the active list. - * - * 2. The REPLY case since it uses the same msg block for the GET - * that was received. - */ - if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { - list_del_init(&msg->msg_activelist); - msg->msg_onactivelist = 0; - } - /* - * The msg_target.nid which was originally set - * when calling LNetGet() or LNetPut() might've - * been overwritten if we're routing this message. - * Call lnet_return_tx_credits_locked() to return - * the credit this message consumed. The message will - * consume another credit when it gets resent. - */ - msg->msg_target.nid = msg->msg_hdr.dest_nid; - lnet_msg_decommit_tx(msg, -EAGAIN); - msg->msg_sending = 0; - msg->msg_receiving = 0; - msg->msg_target_is_router = 0; - - CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n", - libcfs_nid2str(msg->msg_hdr.src_nid), - libcfs_nid2str(msg->msg_hdr.dest_nid), - lnet_msgtyp2str(msg->msg_type), - lnet_health_error2str(hstatus)); - - list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); - lnet_net_unlock(msg->msg_tx_cpt); - - wake_up(&the_lnet.ln_mt_waitq); - return 0; + /* no resend is needed */ + return -1; } static void @@ -892,7 +982,6 @@ lnet_finalize(struct lnet_msg *msg, int status) int my_slot; int cpt; int rc; - int i; LASSERT(!in_interrupt()); @@ -915,7 +1004,6 @@ lnet_finalize(struct lnet_msg *msg, int status) * put on the resend queue. */ if (!lnet_health_check(msg)) - /* Message is queued for resend */ return; } @@ -947,27 +1035,20 @@ again: lnet_net_lock(cpt); container = the_lnet.ln_msg_containers[cpt]; - list_add_tail(&msg->msg_list, &container->msc_finalizing); /* Recursion breaker. Don't complete the message here if I am (or * enough other threads are) already completing messages */ + my_slot = lnet_check_finalize_recursion_locked(msg, + &container->msc_finalizing, + container->msc_nfinalizers, + container->msc_finalizers); - my_slot = -1; - for (i = 0; i < container->msc_nfinalizers; i++) { - if (container->msc_finalizers[i] == current) - break; - - if (my_slot < 0 && container->msc_finalizers[i] == NULL) - my_slot = i; - } - - if (i < container->msc_nfinalizers || my_slot < 0) { + /* enough threads are resending */ + if (my_slot == -1) { lnet_net_unlock(cpt); return; } - container->msc_finalizers[my_slot] = current; - rc = 0; while (!list_empty(&container->msc_finalizing)) { msg = list_entry(container->msc_finalizing.next, @@ -1025,6 +1106,13 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) sizeof(*container->msc_finalizers)); container->msc_finalizers = NULL; } + + if (container->msc_resenders != NULL) { + LIBCFS_FREE(container->msc_resenders, + container->msc_nfinalizers * + sizeof(*container->msc_resenders)); + container->msc_resenders = NULL; + } container->msc_init = 0; } @@ -1037,6 +1125,7 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) INIT_LIST_HEAD(&container->msc_active); INIT_LIST_HEAD(&container->msc_finalizing); + INIT_LIST_HEAD(&container->msc_resending); /* number of CPUs */ container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); @@ -1053,6 +1142,16 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) return -ENOMEM; } + LIBCFS_CPT_ALLOC(container->msc_resenders, lnet_cpt_table(), cpt, + container->msc_nfinalizers * + sizeof(*container->msc_resenders)); + + if (container->msc_resenders == NULL) { + CERROR("Failed to allocate message resenders\n"); + lnet_msg_container_cleanup(container); + return -ENOMEM; + } + return rc; } -- 1.8.3.1