Whamcloud - gitweb
LU-12402 lnet: handle recursion in resend 67/38367/2
authorAmir Shehata <ashehata@whamcloud.com>
Sat, 25 Apr 2020 17:19:32 +0000 (10:19 -0700)
committerOleg Drokin <green@whamcloud.com>
Fri, 1 May 2020 04:33:42 +0000 (04:33 +0000)
When we're resending a message we have to decommit it first. This
could potentially result in another message being picked up from the
queue and sent, which could fail immediately and be finalized, causing
recursion. This problem was observed when a router was being shutdown.

This patch uses the same mechanism used in lnet_finalize() to limit
recursion. If a thread is already finalizing a message and it gets
into path where it starts finalizing a second, then that message
is queued and handled later.

Lustre-change: https://review.whamcloud.com/35431
Lustre-commit: ad9243693c9a5a5b2c34165ad853ddf5ceec4617

Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: Iace64c7ddb1f56a0a63b030df6a5ab103ae6c645
Reviewed-on: https://review.whamcloud.com/38367
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Serguei Smirnov <ssmirnov@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-types.h
lnet/lnet/lib-msg.c

index 8771481..c653167 100644 (file)
@@ -967,9 +967,13 @@ struct lnet_msg_container {
        int                     msc_nfinalizers;
        /* msgs waiting to complete finalizing */
        struct list_head        msc_finalizing;
+       /* msgs waiting to be resent */
+       struct list_head        msc_resending;
        struct list_head        msc_active;     /* active message list */
        /* threads doing finalization */
        void                    **msc_finalizers;
+       /* threads doing resends */
+       void                    **msc_resenders;
 };
 
 /* Peer Discovery states */
index 55b8d80..8822d15 100644 (file)
@@ -586,6 +586,174 @@ lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus)
        }
 }
 
+static void
+lnet_resend_msg_locked(struct lnet_msg *msg)
+{
+       msg->msg_retry_count++;
+
+       /*
+        * remove message from the active list and reset it to prepare
+        * for a resend. Two exceptions to this
+        *
+        * 1. the router case. When a message is being routed it is
+        * committed for rx when received and committed for tx when
+        * forwarded. We don't want to remove it from the active list, since
+        * code which handles receiving expects it to remain on the active
+        * list.
+        *
+        * 2. The REPLY case. Reply messages use the same message
+        * structure for the GET that was received.
+        */
+       if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) {
+               list_del_init(&msg->msg_activelist);
+               msg->msg_onactivelist = 0;
+       }
+       /*
+        * The msg_target.nid which was originally set
+        * when calling LNetGet() or LNetPut() might've
+        * been overwritten if we're routing this message.
+        * Call lnet_msg_decommit_tx() to return the credit
+        * this message consumed. The message will
+        * consume another credit when it gets resent.
+        */
+       msg->msg_target.nid = msg->msg_hdr.dest_nid;
+       lnet_msg_decommit_tx(msg, -EAGAIN);
+       msg->msg_sending = 0;
+       msg->msg_receiving = 0;
+       msg->msg_target_is_router = 0;
+
+       CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n",
+              libcfs_nid2str(msg->msg_hdr.src_nid),
+              libcfs_nid2str(msg->msg_hdr.dest_nid),
+              lnet_msgtyp2str(msg->msg_type),
+              lnet_health_error2str(msg->msg_health_status), msg);
+
+       list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]);
+
+       wake_up(&the_lnet.ln_mt_waitq);
+}
+
+int
+lnet_check_finalize_recursion_locked(struct lnet_msg *msg,
+                                    struct list_head *containerq,
+                                    int nworkers, void **workers)
+{
+       int my_slot = -1;
+       int i;
+
+       list_add_tail(&msg->msg_list, containerq);
+
+       for (i = 0; i < nworkers; i++) {
+               if (workers[i] == current)
+                       break;
+
+               if (my_slot < 0 && workers[i] == NULL)
+                       my_slot = i;
+       }
+
+       if (i < nworkers || my_slot < 0)
+               return -1;
+
+       workers[my_slot] = current;
+
+       return my_slot;
+}
+
+int
+lnet_attempt_msg_resend(struct lnet_msg *msg)
+{
+       struct lnet_msg_container *container;
+       int my_slot;
+       int cpt;
+
+       /* we can only resend tx_committed messages */
+       LASSERT(msg->msg_tx_committed);
+
+       /* don't resend recovery messages */
+       if (msg->msg_recovery) {
+               CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n",
+                       libcfs_nid2str(msg->msg_from),
+                       libcfs_nid2str(msg->msg_target.nid),
+                       msg->msg_retry_count);
+               return -ENOTRECOVERABLE;
+       }
+
+       /*
+        * if we explicitly indicated we don't want to resend then just
+        * return
+        */
+       if (msg->msg_no_resend) {
+               CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n",
+                       libcfs_nid2str(msg->msg_from),
+                       libcfs_nid2str(msg->msg_target.nid),
+                       msg->msg_retry_count);
+               return -ENOTRECOVERABLE;
+       }
+
+       /* check if the message has exceeded the number of retries */
+       if (msg->msg_retry_count >= lnet_retry_count) {
+               CNETERR("msg %s->%s exceeded retry count %d\n",
+                       libcfs_nid2str(msg->msg_from),
+                       libcfs_nid2str(msg->msg_target.nid),
+                       msg->msg_retry_count);
+               return -ENOTRECOVERABLE;
+       }
+
+       cpt = msg->msg_tx_cpt;
+       lnet_net_lock(cpt);
+
+       /* check again under lock */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+               lnet_net_unlock(cpt);
+               return -ESHUTDOWN;
+       }
+
+       container = the_lnet.ln_msg_containers[cpt];
+       my_slot =
+               lnet_check_finalize_recursion_locked(msg,
+                                       &container->msc_resending,
+                                       container->msc_nfinalizers,
+                                       container->msc_resenders);
+
+       /* enough threads are resending */
+       if (my_slot == -1) {
+               lnet_net_unlock(cpt);
+               return 0;
+       }
+
+       while (!list_empty(&container->msc_resending)) {
+               msg = list_entry(container->msc_resending.next,
+                                       struct lnet_msg, msg_list);
+               list_del(&msg->msg_list);
+
+               /*
+                * resending the message will require us to call
+                * lnet_msg_decommit_tx() which will return the credit
+                * which this message holds. This could trigger another
+                * queued message to be sent. If that message fails and
+                * requires a resend we will recurse.
+                * But since at this point the slot is taken, the message
+                * will be queued in the container and dealt with
+                * later. This breaks the recursion.
+                */
+               lnet_resend_msg_locked(msg);
+       }
+
+       /*
+        * msc_resenders is an array of process pointers. Each entry holds
+        * a pointer to the current process operating on the message. An
+        * array entry is created per CPT. If the array slot is already
+        * set, then it means that there is a thread on the CPT currently
+        * resending a message.
+        * Once the thread finishes clear the slot to enable the thread to
+        * take on more resend work.
+        */
+       container->msc_resenders[my_slot] = NULL;
+       lnet_net_unlock(cpt);
+
+       return 0;
+}
+
 /*
  * Do a health check on the message:
  * return -1 if we're not going to handle the error or
@@ -653,7 +821,7 @@ lnet_health_check(struct lnet_msg *msg)
        case LNET_MSG_STATUS_LOCAL_TIMEOUT:
                lnet_handle_local_failure(msg);
                /* add to the re-send queue */
-               goto resend;
+               return lnet_attempt_msg_resend(msg);
 
        /*
         * These errors will not trigger a resend so simply
@@ -669,7 +837,7 @@ lnet_health_check(struct lnet_msg *msg)
         */
        case LNET_MSG_STATUS_REMOTE_DROPPED:
                lnet_handle_remote_failure(msg->msg_txpeer);
-               goto resend;
+               return lnet_attempt_msg_resend(msg);
 
        case LNET_MSG_STATUS_REMOTE_ERROR:
        case LNET_MSG_STATUS_REMOTE_TIMEOUT:
@@ -680,86 +848,8 @@ lnet_health_check(struct lnet_msg *msg)
                LBUG();
        }
 
-resend:
-       /* don't resend recovery messages */
-       if (msg->msg_recovery) {
-               CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n",
-                       libcfs_nid2str(msg->msg_from),
-                       libcfs_nid2str(msg->msg_target.nid),
-                       msg->msg_retry_count);
-               return -1;
-       }
-
-       /*
-        * if we explicitly indicated we don't want to resend then just
-        * return
-        */
-       if (msg->msg_no_resend) {
-               CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n",
-                       libcfs_nid2str(msg->msg_from),
-                       libcfs_nid2str(msg->msg_target.nid),
-                       msg->msg_retry_count);
-               return -1;
-       }
-
-       /* check if the message has exceeded the number of retries */
-       if (msg->msg_retry_count >= lnet_retry_count) {
-               CNETERR("msg %s->%s exceeded retry count %d\n",
-                       libcfs_nid2str(msg->msg_from),
-                       libcfs_nid2str(msg->msg_target.nid),
-                       msg->msg_retry_count);
-               return -1;
-       }
-       msg->msg_retry_count++;
-
-       lnet_net_lock(msg->msg_tx_cpt);
-
-       /* check again under lock */
-       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
-               lnet_net_unlock(msg->msg_tx_cpt);
-               return -1;
-       }
-
-       /*
-        * remove message from the active list and reset it in preparation
-        * for a resend. Two exception to this
-        *
-        * 1. the router case, whe a message is committed for rx when
-        * received, then tx when it is sent. When committed to both tx and
-        * rx we don't want to remove it from the active list.
-        *
-        * 2. The REPLY case since it uses the same msg block for the GET
-        * that was received.
-        */
-       if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) {
-               list_del_init(&msg->msg_activelist);
-               msg->msg_onactivelist = 0;
-       }
-       /*
-        * The msg_target.nid which was originally set
-        * when calling LNetGet() or LNetPut() might've
-        * been overwritten if we're routing this message.
-        * Call lnet_return_tx_credits_locked() to return
-        * the credit this message consumed. The message will
-        * consume another credit when it gets resent.
-        */
-       msg->msg_target.nid = msg->msg_hdr.dest_nid;
-       lnet_msg_decommit_tx(msg, -EAGAIN);
-       msg->msg_sending = 0;
-       msg->msg_receiving = 0;
-       msg->msg_target_is_router = 0;
-
-       CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n",
-              libcfs_nid2str(msg->msg_hdr.src_nid),
-              libcfs_nid2str(msg->msg_hdr.dest_nid),
-              lnet_msgtyp2str(msg->msg_type),
-              lnet_health_error2str(hstatus));
-
-       list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]);
-       lnet_net_unlock(msg->msg_tx_cpt);
-
-       wake_up(&the_lnet.ln_mt_waitq);
-       return 0;
+       /* no resend is needed */
+       return -1;
 }
 
 static void
@@ -892,7 +982,6 @@ lnet_finalize(struct lnet_msg *msg, int status)
        int my_slot;
        int cpt;
        int rc;
-       int i;
 
        LASSERT(!in_interrupt());
 
@@ -915,7 +1004,6 @@ lnet_finalize(struct lnet_msg *msg, int status)
                 * put on the resend queue.
                 */
                if (!lnet_health_check(msg))
-                       /* Message is queued for resend */
                        return;
        }
 
@@ -947,27 +1035,20 @@ again:
        lnet_net_lock(cpt);
 
        container = the_lnet.ln_msg_containers[cpt];
-       list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
        /* Recursion breaker.  Don't complete the message here if I am (or
         * enough other threads are) already completing messages */
+       my_slot = lnet_check_finalize_recursion_locked(msg,
+                                               &container->msc_finalizing,
+                                               container->msc_nfinalizers,
+                                               container->msc_finalizers);
 
-       my_slot = -1;
-       for (i = 0; i < container->msc_nfinalizers; i++) {
-               if (container->msc_finalizers[i] == current)
-                       break;
-
-               if (my_slot < 0 && container->msc_finalizers[i] == NULL)
-                       my_slot = i;
-       }
-
-       if (i < container->msc_nfinalizers || my_slot < 0) {
+       /* enough threads are resending */
+       if (my_slot == -1) {
                lnet_net_unlock(cpt);
                return;
        }
 
-       container->msc_finalizers[my_slot] = current;
-
        rc = 0;
        while (!list_empty(&container->msc_finalizing)) {
                msg = list_entry(container->msc_finalizing.next,
@@ -1025,6 +1106,13 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
                            sizeof(*container->msc_finalizers));
                container->msc_finalizers = NULL;
        }
+
+       if (container->msc_resenders != NULL) {
+               LIBCFS_FREE(container->msc_resenders,
+                           container->msc_nfinalizers *
+                           sizeof(*container->msc_resenders));
+               container->msc_resenders = NULL;
+       }
        container->msc_init = 0;
 }
 
@@ -1037,6 +1125,7 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 
        INIT_LIST_HEAD(&container->msc_active);
        INIT_LIST_HEAD(&container->msc_finalizing);
+       INIT_LIST_HEAD(&container->msc_resending);
 
        /* number of CPUs */
        container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
@@ -1053,6 +1142,16 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
                return -ENOMEM;
        }
 
+       LIBCFS_CPT_ALLOC(container->msc_resenders, lnet_cpt_table(), cpt,
+                        container->msc_nfinalizers *
+                        sizeof(*container->msc_resenders));
+
+       if (container->msc_resenders == NULL) {
+               CERROR("Failed to allocate message resenders\n");
+               lnet_msg_container_cleanup(container);
+               return -ENOMEM;
+       }
+
        return rc;
 }