return rc;
}
+/* NB: caller shall hold a ref on 'lp' as I'd drop LNET_LOCK */
+void
+lnet_ni_peer_alive(lnet_peer_t *lp)
+{
+ time_t last_alive = 0;
+ lnet_ni_t *ni = lp->lp_ni;
+
+ LASSERT (ni != NULL);
+ LASSERT (ni->ni_peertimeout > 0);
+ LASSERT (ni->ni_lnd->lnd_query != NULL);
+
+ LNET_UNLOCK();
+ (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
+ LNET_LOCK();
+
+ lp->lp_last_query = cfs_time_current_sec();
+
+ if (last_alive != 0) /* NI has updated timestamp */
+ lp->lp_last_alive = last_alive;
+ return;
+}
+
+/* NB: always called with LNET_LOCK held */
+static inline int
+lnet_peer_is_alive (lnet_peer_t *lp, time_t now)
+{
+ lnet_ni_t *ni = lp->lp_ni;
+ time_t deadline;
+ int alive;
+
+ LASSERT (ni != NULL);
+ LASSERT (ni->ni_peertimeout > 0);
+
+ if (!lp->lp_alive && lp->lp_alive_count > 0 &&
+ cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
+ return 0;
+
+ deadline = cfs_time_add(lp->lp_last_alive, ni->ni_peertimeout);
+ alive = cfs_time_after(deadline, now);
+ if (alive && !lp->lp_alive) /* update obsolete lp_alive */
+ lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
+
+ return alive;
+}
+
+/* don't query LND about aliveness of a dead peer more frequently than: */
+static int lnet_queryinterval = 1; /* 1 second */
+
+/* NB: returns 1 when alive, 0 when dead, negative when error;
+ * may drop the LNET_LOCK */
+int
+lnet_peer_alive_locked (lnet_peer_t *lp)
+{
+ lnet_ni_t *ni = lp->lp_ni;
+ time_t now = cfs_time_current_sec();
+
+ LASSERT (ni != NULL);
+
+ if (ni->ni_peertimeout <= 0) /* disabled */
+ return -ENODEV;
+
+ if (lnet_peer_is_alive(lp, now))
+ return 1;
+
+ /* peer appears dead, should we query right now? */
+ if (lp->lp_last_query != 0) {
+ time_t deadline =
+ cfs_time_add(lp->lp_last_query,
+ lnet_queryinterval);
+
+ if (cfs_time_before(now, deadline)) {
+ if (lp->lp_alive)
+ CWARN("Unexpected aliveness of peer %s: "
+ "%d < %d (%d/%d)\n",
+ libcfs_nid2str(lp->lp_nid),
+ (int)now, (int)deadline,
+ lnet_queryinterval, ni->ni_peertimeout);
+ return 0;
+ }
+ }
+
+ /* query LND for latest aliveness news */
+ lnet_ni_peer_alive(lp);
+
+ if (lnet_peer_is_alive(lp, now))
+ return 1;
+
+ lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
+ return 0;
+}
+
int
lnet_post_send_locked (lnet_msg_t *msg, int do_send)
{
/* lnet_send is going to LNET_UNLOCK immediately after this, so it sets
* do_send FALSE and I don't do the unlock/send/lock bit. I return
- * EAGAIN if msg blocked and 0 if sent or OK to send */
+ * EAGAIN if msg blocked, EHOSTUNREACH if msg_txpeer appears dead, and
+ * 0 if sent or OK to send */
lnet_peer_t *lp = msg->msg_txpeer;
lnet_ni_t *ni = lp->lp_ni;
LASSERT (!do_send || msg->msg_delayed);
LASSERT (!msg->msg_receiving);
+ /* NB 'lp' is always the next hop */
+ if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
+ lnet_peer_alive_locked(lp) == 0) {
+ LNET_UNLOCK();
+
+ CDEBUG(D_NETERROR, "Dropping message for %s: peer not alive\n",
+ libcfs_id2str(msg->msg_target));
+ if (do_send)
+ lnet_finalize(ni, msg, -EHOSTUNREACH);
+
+ LNET_LOCK();
+ return EHOSTUNREACH;
+ }
+
if (!msg->msg_peertxcredit) {
LASSERT ((lp->lp_txcredits < 0) == !list_empty(&lp->lp_txq));
{
/* lnet_parse is going to LNET_UNLOCK immediately after this, so it
* sets do_recv FALSE and I don't do the unlock/send/lock bit. I
- * return EAGAIN if msg blocked and 0 if sent or OK to send */
+ * return EAGAIN if msg blocked and 0 if received or OK to receive */
lnet_peer_t *lp = msg->msg_rxpeer;
lnet_rtrbufpool_t *rbp;
lnet_rtrbuf_t *rb;
rc = lnet_post_send_locked(msg, 0);
LNET_UNLOCK();
+ if (rc == EHOSTUNREACH)
+ return -EHOSTUNREACH;
+
if (rc == 0)
lnet_ni_send(src_ni, msg);
int rc = 0;
int for_me;
lnet_msg_t *msg;
+ lnet_pid_t dest_pid;
lnet_nid_t dest_nid;
lnet_nid_t src_nid;
__u32 payload_length;
type = le32_to_cpu(hdr->type);
src_nid = le64_to_cpu(hdr->src_nid);
dest_nid = le64_to_cpu(hdr->dest_nid);
+ dest_pid = le32_to_cpu(hdr->dest_pid);
payload_length = le32_to_cpu(hdr->payload_length);
for_me = (ni->ni_nid == dest_nid);
LASSERT (for_me);
#else
if (!for_me) {
- msg->msg_target.pid = le32_to_cpu(hdr->dest_pid);
+ msg->msg_target.pid = dest_pid;
msg->msg_target.nid = dest_nid;
msg->msg_routing = 1;
msg->msg_offset = 0;
goto free_drop;
}
}
-
lnet_commit_routedmsg(msg);
rc = lnet_post_routed_recv_locked(msg, 0);
LNET_UNLOCK();
msg->msg_hdr.src_nid = src_nid;
msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
msg->msg_hdr.dest_nid = dest_nid;
- msg->msg_hdr.dest_pid = le32_to_cpu(msg->msg_hdr.dest_pid);
+ msg->msg_hdr.dest_pid = dest_pid;
msg->msg_hdr.payload_length = payload_length;
msg->msg_ev.sender = from_nid;