Whamcloud - gitweb
b=19694
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index 0d443a7..a0bebbf 100644 (file)
@@ -899,12 +899,104 @@ lnet_eager_recv_locked(lnet_msg_t *msg)
         return rc;
 }
 
+/* NB: caller shall hold a ref on 'lp' as I'd drop LNET_LOCK */
+void
+lnet_ni_peer_alive(lnet_peer_t *lp)
+{
+        time_t      last_alive = 0;
+        lnet_ni_t  *ni = lp->lp_ni;
+
+        LASSERT (ni != NULL);
+        LASSERT (ni->ni_peertimeout > 0);
+        LASSERT (ni->ni_lnd->lnd_query != NULL);
+
+        LNET_UNLOCK();
+        (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
+        LNET_LOCK();
+
+        lp->lp_last_query = cfs_time_current_sec();
+
+        if (last_alive != 0) /* NI has updated timestamp */
+                lp->lp_last_alive = last_alive;
+        return;
+}
+
+/* NB: always called with LNET_LOCK held */
+static inline int
+lnet_peer_is_alive (lnet_peer_t *lp, time_t now)
+{
+        lnet_ni_t  *ni = lp->lp_ni;
+        time_t      deadline;
+        int         alive;
+
+        LASSERT (ni != NULL);
+        LASSERT (ni->ni_peertimeout > 0);
+
+        if (!lp->lp_alive && lp->lp_alive_count > 0 &&
+            cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
+                        return 0;
+
+        deadline = cfs_time_add(lp->lp_last_alive, ni->ni_peertimeout);
+        alive = cfs_time_after(deadline, now);
+        if (alive && !lp->lp_alive) /* update obsolete lp_alive */
+                lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
+
+        return alive;
+}
+
+/* don't query LND about aliveness of a dead peer more frequently than: */
+static int lnet_queryinterval = 1; /* 1 second */
+
+/* NB: returns 1 when alive, 0 when dead, negative when error;
+ *     may drop the LNET_LOCK */
+int
+lnet_peer_alive_locked (lnet_peer_t *lp)
+{
+        lnet_ni_t  *ni = lp->lp_ni;
+        time_t      now = cfs_time_current_sec();
+
+        LASSERT (ni != NULL);
+
+        if (ni->ni_peertimeout <= 0)  /* disabled */
+                return -ENODEV;
+
+        if (lnet_peer_is_alive(lp, now))
+                return 1;
+
+        /* peer appears dead, should we query right now? */
+        if (lp->lp_last_query != 0) {
+                time_t deadline =
+                        cfs_time_add(lp->lp_last_query,
+                                     lnet_queryinterval);
+
+                if (cfs_time_before(now, deadline)) {
+                        if (lp->lp_alive)
+                                CWARN("Unexpected aliveness of peer %s: "
+                                      "%d < %d (%d/%d)\n",
+                                      libcfs_nid2str(lp->lp_nid),
+                                      (int)now, (int)deadline,
+                                      lnet_queryinterval, ni->ni_peertimeout);
+                        return 0;
+                }
+        }
+
+        /* query LND for latest aliveness news */
+        lnet_ni_peer_alive(lp);
+
+        if (lnet_peer_is_alive(lp, now))
+                return 1;
+
+        lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
+        return 0;
+}
+
 int
 lnet_post_send_locked (lnet_msg_t *msg, int do_send)
 {
         /* lnet_send is going to LNET_UNLOCK immediately after this, so it sets
          * do_send FALSE and I don't do the unlock/send/lock bit.  I return
-         * EAGAIN if msg blocked and 0 if sent or OK to send */
+         * EAGAIN if msg blocked, EHOSTUNREACH if msg_txpeer appears dead, and
+         * 0 if sent or OK to send */
         lnet_peer_t *lp = msg->msg_txpeer;
         lnet_ni_t   *ni = lp->lp_ni;
 
@@ -912,6 +1004,20 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
         LASSERT (!do_send || msg->msg_delayed);
         LASSERT (!msg->msg_receiving);
 
+        /* NB 'lp' is always the next hop */
+        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
+            lnet_peer_alive_locked(lp) == 0) {
+                LNET_UNLOCK();
+
+                CDEBUG(D_NETERROR, "Dropping message for %s: peer not alive\n",
+                       libcfs_id2str(msg->msg_target));
+                if (do_send)
+                        lnet_finalize(ni, msg, -EHOSTUNREACH);
+
+                LNET_LOCK();
+                return EHOSTUNREACH;
+        }
+
         if (!msg->msg_peertxcredit) {
                 LASSERT ((lp->lp_txcredits < 0) == !list_empty(&lp->lp_txq));
 
@@ -993,7 +1099,7 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
 {
         /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
          * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
-         * return EAGAIN if msg blocked and 0 if sent or OK to send */
+         * return EAGAIN if msg blocked and 0 if received or OK to receive */
         lnet_peer_t         *lp = msg->msg_rxpeer;
         lnet_rtrbufpool_t   *rbp;
         lnet_rtrbuf_t       *rb;
@@ -1327,6 +1433,9 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         rc = lnet_post_send_locked(msg, 0);
         LNET_UNLOCK();
 
+        if (rc == EHOSTUNREACH)
+                return -EHOSTUNREACH;
+
         if (rc == 0)
                 lnet_ni_send(src_ni, msg);
 
@@ -1983,6 +2092,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         int            rc = 0;
         int            for_me;
         lnet_msg_t    *msg;
+        lnet_pid_t     dest_pid;
         lnet_nid_t     dest_nid;
         lnet_nid_t     src_nid;
         __u32          payload_length;
@@ -1993,6 +2103,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         type = le32_to_cpu(hdr->type);
         src_nid = le64_to_cpu(hdr->src_nid);
         dest_nid = le64_to_cpu(hdr->dest_nid);
+        dest_pid = le32_to_cpu(hdr->dest_pid);
         payload_length = le32_to_cpu(hdr->payload_length);
 
         for_me = (ni->ni_nid == dest_nid);
@@ -2120,7 +2231,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         LASSERT (for_me);
 #else
         if (!for_me) {
-                msg->msg_target.pid = le32_to_cpu(hdr->dest_pid);
+                msg->msg_target.pid = dest_pid;
                 msg->msg_target.nid = dest_nid;
                 msg->msg_routing = 1;
                 msg->msg_offset = 0;
@@ -2134,7 +2245,6 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                                 goto free_drop;
                         }
                 }
-
                 lnet_commit_routedmsg(msg);
                 rc = lnet_post_routed_recv_locked(msg, 0);
                 LNET_UNLOCK();
@@ -2150,7 +2260,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         msg->msg_hdr.src_nid = src_nid;
         msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
         msg->msg_hdr.dest_nid = dest_nid;
-        msg->msg_hdr.dest_pid = le32_to_cpu(msg->msg_hdr.dest_pid);
+        msg->msg_hdr.dest_pid = dest_pid;
         msg->msg_hdr.payload_length = payload_length;
 
         msg->msg_ev.sender = from_nid;