Whamcloud - gitweb
LU-9921 lnet: resolve unsafe list access 23/28723/6
authorAmir Shehata <amir.shehata@intel.com>
Sat, 26 Aug 2017 04:18:16 +0000 (21:18 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 13 Sep 2017 03:37:43 +0000 (03:37 +0000)
Use list_for_each_entry_safe() when accessing messages on pending
queue. Remove the message from the list before calling lnet_finalize()
or lnet_send().

When destroying the peer make sure to queue all pending messages on
a global list. We can not resend them at this point because the
cpt lock is held. Unlocking the cpt lock could lead to an inconsistent
state. Use the discovery thread to check if the global list is not
empty and if so resend all messages on the list. Use a new spin
lock to protect the resend message list. I steered clear from reusing
an existing lock because LNet locking is complex and reusing a lock
will add to this complexity. Using a new lock makes the code easier
to understand.

Verify that all lists are empty before destroying the peer_ni

Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Change-Id: Ia081419ec5ed2be5823cfbca7e050138a229ab9c
Reviewed-on: https://review.whamcloud.com/28723
Tested-by: Jenkins
Reviewed-by: Olaf Weber <olaf.weber@hpe.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/peer.c

index 7c7a9bd..bd4cc5b 100644 (file)
@@ -935,6 +935,10 @@ typedef struct lnet {
        struct lnet_ni                  *ln_loni;
        /* network zombie list */
        struct list_head                ln_net_zombie;
+       /* resend messages list */
+       struct list_head                ln_msg_resend;
+       /* spin lock to protect the msg resend list */
+       spinlock_t                      ln_msg_resend_lock;
 
        /* remote networks with routes to them */
        struct list_head                *ln_remote_nets_hash;
index c725e6e..2852fd8 100644 (file)
@@ -209,6 +209,7 @@ static void
 lnet_init_locks(void)
 {
        spin_lock_init(&the_lnet.ln_eq_wait_lock);
+       spin_lock_init(&the_lnet.ln_msg_resend_lock);
        init_waitqueue_head(&the_lnet.ln_eq_waitq);
        init_waitqueue_head(&the_lnet.ln_rc_waitq);
        mutex_init(&the_lnet.ln_lnd_mutex);
@@ -1683,6 +1684,10 @@ static void
 lnet_shutdown_lndnets(void)
 {
        struct lnet_net *net;
+       struct list_head resend;
+       struct lnet_msg *msg, *tmp;
+
+       INIT_LIST_HEAD(&resend);
 
        /* NB called holding the global mutex */
 
@@ -1718,6 +1723,15 @@ lnet_shutdown_lndnets(void)
                lnet_shutdown_lndnet(net);
        }
 
+       spin_lock(&the_lnet.ln_msg_resend_lock);
+       list_splice(&the_lnet.ln_msg_resend, &resend);
+       spin_unlock(&the_lnet.ln_msg_resend_lock);
+
+       list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
+               list_del_init(&msg->msg_list);
+               lnet_finalize(msg, -ECANCELED);
+       }
+
        lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_state = LNET_STATE_SHUTDOWN;
        lnet_net_unlock(LNET_LOCK_EX);
@@ -2061,6 +2075,7 @@ int lnet_lib_init(void)
        INIT_LIST_HEAD(&the_lnet.ln_lnds);
        INIT_LIST_HEAD(&the_lnet.ln_net_zombie);
        INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
+       INIT_LIST_HEAD(&the_lnet.ln_msg_resend);
        INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
 
        /* The hash table size is the number of bits it takes to express the set
index 7865c71..d24c201 100644 (file)
@@ -267,6 +267,23 @@ lnet_destroy_peer_locked(struct lnet_peer *lp)
        if (lp->lp_data)
                lnet_ping_buffer_decref(lp->lp_data);
 
+       /*
+        * if there are messages still on the pending queue, then make
+        * sure to queue them on the ln_msg_resend list so they can be
+        * resent at a later point if the discovery thread is still
+        * running.
+        * If the discovery thread has stopped, then the wakeup will be a
+        * no-op, and it is expected the lnet_shutdown_lndnets() will
+        * eventually be called, which will traverse this list and
+        * finalize the messages on the list.
+        * We can not resend them now because we're holding the cpt lock.
+        * Releasing the lock can cause an inconsistent state
+        */
+       spin_lock(&the_lnet.ln_msg_resend_lock);
+       list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
+       spin_unlock(&the_lnet.ln_msg_resend_lock);
+       wake_up(&the_lnet.ln_dc_waitq);
+
        LIBCFS_FREE(lp, sizeof(*lp));
 }
 
@@ -556,8 +573,8 @@ lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
 void
 lnet_peer_tables_cleanup(struct lnet_net *net)
 {
-       int                             i;
-       struct lnet_peer_table          *ptable;
+       int i;
+       struct lnet_peer_table *ptable;
 
        LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
        /* If just deleting the peers for a NI, get rid of any routes these
@@ -1531,6 +1548,8 @@ lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
        LASSERT(lpni->lpni_rtr_refcount == 0);
        LASSERT(list_empty(&lpni->lpni_txq));
        LASSERT(lpni->lpni_txqnob == 0);
+       LASSERT(list_empty(&lpni->lpni_peer_nis));
+       LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
 
        lpn = lpni->lpni_peer_net;
        lpni->lpni_peer_net = NULL;
@@ -1731,7 +1750,7 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
  */
 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
 {
-       struct lnet_msg *msg = NULL;
+       struct lnet_msg *msg, *tmp;
        int rc = 0;
        struct list_head pending_msgs;
 
@@ -1747,7 +1766,8 @@ static void lnet_peer_discovery_complete(struct lnet_peer *lp)
        lnet_net_unlock(LNET_LOCK_EX);
 
        /* iterate through all pending messages and send them again */
-       list_for_each_entry(msg, &pending_msgs, msg_list) {
+       list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
+               list_del_init(&msg->msg_list);
                if (lp->lp_dc_error) {
                        lnet_finalize(msg, lp->lp_dc_error);
                        continue;
@@ -2974,6 +2994,8 @@ static int lnet_peer_discovery_wait_for_work(void)
                        break;
                if (!list_empty(&the_lnet.ln_dc_request))
                        break;
+               if (!list_empty(&the_lnet.ln_msg_resend))
+                       break;
                if (lnet_peer_dc_timed_out(ktime_get_real_seconds()))
                        break;
                lnet_net_unlock(cpt);
@@ -2999,6 +3021,48 @@ static int lnet_peer_discovery_wait_for_work(void)
        return rc;
 }
 
+/*
+ * Messages that were pending on a destroyed peer will be put on a global
+ * resend list. The message resend list will be checked by
+ * the discovery thread when it wakes up, and will resend messages. These
+ * messages can still be sendable in the case the lpni which was the initial
+ * cause of the message re-queue was transfered to another peer.
+ *
+ * It is possible that LNet could be shutdown while we're iterating
+ * through the list. lnet_shudown_lndnets() will attempt to access the
+ * resend list, but will have to wait until the spinlock is released, by
+ * which time there shouldn't be any more messages on the resend list.
+ * During shutdown lnet_send() will fail and lnet_finalize() will be called
+ * for the messages so they can be released. The other case is that
+ * lnet_shudown_lndnets() can finalize all the messages before this
+ * function can visit the resend list, in which case this function will be
+ * a no-op.
+ */
+static void lnet_resend_msgs(void)
+{
+       struct lnet_msg *msg, *tmp;
+       struct list_head resend;
+       int rc;
+
+       INIT_LIST_HEAD(&resend);
+
+       spin_lock(&the_lnet.ln_msg_resend_lock);
+       list_splice(&the_lnet.ln_msg_resend, &resend);
+       spin_unlock(&the_lnet.ln_msg_resend_lock);
+
+       list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
+               list_del_init(&msg->msg_list);
+               rc = lnet_send(msg->msg_src_nid_param, msg,
+                              msg->msg_rtr_nid_param);
+               if (rc < 0) {
+                       CNETERR("Error sending %s to %s: %d\n",
+                              lnet_msgtyp2str(msg->msg_type),
+                              libcfs_id2str(msg->msg_target), rc);
+                       lnet_finalize(msg, rc);
+               }
+       }
+}
+
 /* The discovery thread. */
 static int lnet_peer_discovery(void *arg)
 {
@@ -3013,6 +3077,8 @@ static int lnet_peer_discovery(void *arg)
                if (lnet_peer_discovery_wait_for_work())
                        break;
 
+               lnet_resend_msgs();
+
                if (lnet_push_target_resize_needed())
                        lnet_push_target_resize();