*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lnet/lnet/lib-msg.c
*
ev->status = 0;
ev->unlinked = 1;
ev->type = LNET_EVENT_UNLINK;
- lnet_md_deconstruct(md, &ev->md);
+ lnet_md_deconstruct(md, ev);
lnet_md2handle(&ev->md_handle, md);
EXIT;
}
if (ev_type == LNET_EVENT_SEND) {
/* event for active message */
- ev->target.nid = le64_to_cpu(hdr->dest_nid);
- ev->target.pid = le32_to_cpu(hdr->dest_pid);
- ev->initiator.nid = LNET_NID_ANY;
+ ev->target.nid = hdr->dest_nid;
+ ev->target.pid = hdr->dest_pid;
+ ev->initiator.nid = LNET_ANY_NID;
ev->initiator.pid = the_lnet.ln_pid;
- ev->source.nid = LNET_NID_ANY;
+ ev->source.nid = LNET_ANY_NID;
ev->source.pid = the_lnet.ln_pid;
- ev->sender = LNET_NID_ANY;
+ ev->sender = LNET_ANY_NID;
} else {
/* event for passive message */
ev->target.pid = hdr->dest_pid;
/* Multi-Rail: track source NID. */
ev->source.pid = hdr->src_pid;
ev->source.nid = hdr->src_nid;
- ev->rlength = hdr->payload_length;
+ ev->rlength = hdr->payload_length;
ev->sender = msg->msg_from;
ev->mlength = msg->msg_wanted;
ev->offset = msg->msg_offset;
/* build umd in event */
lnet_md2handle(&msg->msg_ev.md_handle, md);
- lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md_deconstruct(md, &msg->msg_ev);
}
static int
ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
- lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0);
+ lnet_prep_send(msg, LNET_MSG_ACK, &msg->msg_ev.source, 0, 0);
msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
- rc = lnet_send(msg->msg_ev.target.nid, msg, msg->msg_from);
+ rc = lnet_send(&msg->msg_ev.target.nid, msg,
+ &msg->msg_from);
lnet_net_lock(cpt);
/*
LASSERT(!msg->msg_receiving); /* called back recv already */
lnet_net_unlock(cpt);
- rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
+ rc = lnet_send(NULL, msg, NULL);
lnet_net_lock(cpt);
/*
}
}
+/* must hold net_lock/0 */
+void
+lnet_ni_add_to_recoveryq_locked(struct lnet_ni *ni,
+ struct list_head *recovery_queue, time64_t now)
+{
+ if (!list_empty(&ni->ni_recovery))
+ return;
+
+ if (atomic_read(&ni->ni_healthv) == LNET_MAX_HEALTH_VALUE)
+ return;
+
+ /* This NI is going on the recovery queue, so take a ref on it */
+ lnet_ni_addref_locked(ni, 0);
+
+ lnet_ni_set_next_ping(ni, now);
+
+ CDEBUG(D_NET, "%s added to recovery queue. ping count: %u next ping: %lld health :%d\n",
+ libcfs_nidstr(&ni->ni_nid),
+ ni->ni_ping_count,
+ ni->ni_next_ping,
+ atomic_read(&ni->ni_healthv));
+
+ list_add_tail(&ni->ni_recovery, recovery_queue);
+}
+
static void
lnet_handle_local_failure(struct lnet_ni *local_ni)
{
}
lnet_dec_healthv_locked(&local_ni->ni_healthv, lnet_health_sensitivity);
- /*
- * add the NI to the recovery queue if it's not already there
- * and it's health value is actually below the maximum. It's
- * possible that the sensitivity might be set to 0, and the health
- * value will not be reduced. In this case, there is no reason to
- * invoke recovery
- */
- if (list_empty(&local_ni->ni_recovery) &&
- atomic_read(&local_ni->ni_healthv) < LNET_MAX_HEALTH_VALUE) {
- CDEBUG(D_NET, "ni %s added to recovery queue. Health = %d\n",
- libcfs_nid2str(local_ni->ni_nid),
- atomic_read(&local_ni->ni_healthv));
- list_add_tail(&local_ni->ni_recovery,
- &the_lnet.ln_mt_localNIRecovq);
- lnet_ni_addref_locked(local_ni, 0);
- }
+ lnet_ni_add_to_recoveryq_locked(local_ni, &the_lnet.ln_mt_localNIRecovq,
+ ktime_get_seconds());
lnet_net_unlock(0);
}
+/* must hold net_lock/0 */
void
lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni)
{
__u32 sensitivity = lnet_health_sensitivity;
__u32 lp_sensitivity;
- /* lpni could be NULL if we're in the LOLND case */
- if (!lpni)
- return;
-
/*
* If there is a health sensitivity in the peer then use that
* instead of the globally set one.
* value will not be reduced. In this case, there is no reason to
* invoke recovery
*/
- lnet_peer_ni_add_to_recoveryq_locked(lpni);
+ lnet_peer_ni_add_to_recoveryq_locked(lpni,
+ &the_lnet.ln_mt_peerNIRecovq,
+ ktime_get_seconds());
}
static void
}
static void
-lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus)
+lnet_incr_hstats(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
+ enum lnet_msg_hstatus hstatus)
{
- struct lnet_ni *ni = msg->msg_txni;
- struct lnet_peer_ni *lpni = msg->msg_txpeer;
struct lnet_counters_health *health;
health = &the_lnet.ln_counters[0]->lct_health;
msg->msg_target_is_router = 0;
CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n",
- libcfs_nid2str(msg->msg_hdr.src_nid),
- libcfs_nid2str(msg->msg_hdr.dest_nid),
+ libcfs_nidstr(&msg->msg_hdr.src_nid),
+ libcfs_nidstr(&msg->msg_hdr.dest_nid),
lnet_msgtyp2str(msg->msg_type),
lnet_health_error2str(msg->msg_health_status), msg);
/* don't resend recovery messages */
if (msg->msg_recovery) {
CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n",
- libcfs_nid2str(msg->msg_from),
- libcfs_nid2str(msg->msg_target.nid),
+ libcfs_nidstr(&msg->msg_from),
+ libcfs_nidstr(&msg->msg_target.nid),
msg->msg_retry_count);
return -ENOTRECOVERABLE;
}
*/
if (msg->msg_no_resend) {
CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n",
- libcfs_nid2str(msg->msg_from),
- libcfs_nid2str(msg->msg_target.nid),
+ libcfs_nidstr(&msg->msg_from),
+ libcfs_nidstr(&msg->msg_target.nid),
msg->msg_retry_count);
return -ENOTRECOVERABLE;
}
/* check if the message has exceeded the number of retries */
if (msg->msg_retry_count >= lnet_retry_count) {
CNETERR("msg %s->%s exceeded retry count %d\n",
- libcfs_nid2str(msg->msg_from),
- libcfs_nid2str(msg->msg_target.nid),
+ libcfs_nidstr(&msg->msg_from),
+ libcfs_nidstr(&msg->msg_target.nid),
msg->msg_retry_count);
return -ENOTRECOVERABLE;
}
return 0;
}
- while (!list_empty(&container->msc_resending)) {
- msg = list_entry(container->msc_resending.next,
- struct lnet_msg, msg_list);
+ while ((msg = list_first_entry_or_null(&container->msc_resending,
+ struct lnet_msg,
+ msg_list)) != NULL) {
list_del(&msg->msg_list);
/*
struct lnet_peer_ni *lpni;
struct lnet_ni *ni;
bool lo = false;
+ bool attempt_local_resend;
+ bool attempt_remote_resend;
+ bool handle_local_health;
+ bool handle_remote_health;
/* if we're shutting down no point in handling health. */
if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
* set. So no need to sanity check it.
*/
if (msg->msg_tx_committed &&
- LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+ !nid_is_lo0(&msg->msg_txni->ni_nid))
LASSERT(msg->msg_txpeer);
else if (msg->msg_tx_committed &&
- LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) == LOLND)
+ nid_is_lo0(&msg->msg_txni->ni_nid))
lo = true;
if (hstatus != LNET_MSG_STATUS_OK &&
return -1;
/*
- * stats are only incremented for errors so avoid wasting time
- * incrementing statistics if there is no error.
- */
- if (hstatus != LNET_MSG_STATUS_OK) {
- lnet_net_lock(0);
- lnet_incr_hstats(msg, hstatus);
- lnet_net_unlock(0);
- }
-
- /*
* always prefer txni/txpeer if they message is committed for both
* directions.
*/
if (msg->msg_tx_committed) {
ni = msg->msg_txni;
lpni = msg->msg_txpeer;
+ attempt_local_resend = attempt_remote_resend = true;
} else {
ni = msg->msg_rxni;
lpni = msg->msg_rxpeer;
+ attempt_local_resend = attempt_remote_resend = false;
}
if (!lo)
LASSERT(ni);
CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
- libcfs_nid2str(ni->ni_nid),
- (lo) ? "self" : libcfs_nid2str(lpni->lpni_nid),
+ libcfs_nidstr(&ni->ni_nid),
+ (lo) ? "self" : libcfs_nidstr(&lpni->lpni_nid),
lnet_msgtyp2str(msg->msg_type),
lnet_health_error2str(hstatus));
+ /*
+ * stats are only incremented for errors so avoid wasting time
+ * incrementing statistics if there is no error. Similarly, whether to
+ * update health values or perform resends is only applicable for
+ * messages with a health status != OK.
+ */
+ if (hstatus != LNET_MSG_STATUS_OK) {
+ /* Don't further decrement the health value if a recovery
+ * message failed.
+ */
+ if (msg->msg_recovery)
+ handle_local_health = handle_remote_health = false;
+ else
+ handle_local_health = handle_remote_health = true;
+
+ /* For local failures, health/recovery/resends are not needed if
+ * I only have a single (non-lolnd) interface. NB: pb_nnis
+ * includes the lolnd interface, so a single-rail node would
+ * have pb_nnis == 2.
+ */
+ if (the_lnet.ln_ping_target->pb_nnis <= 2) {
+ handle_local_health = false;
+ attempt_local_resend = false;
+ }
+
+ lnet_net_lock(0);
+ lnet_incr_hstats(ni, lpni, hstatus);
+ /* For remote failures, health/recovery/resends are not needed
+ * if the peer only has a single interface. Special case for
+ * routers where we rely on health feature to manage route
+ * aliveness. NB: unlike pb_nnis above, lp_nnis does _not_
+ * include the lolnd, so a single-rail node would have
+ * lp_nnis == 1.
+ */
+ if (lpni && lpni->lpni_peer_net &&
+ lpni->lpni_peer_net->lpn_peer &&
+ lpni->lpni_peer_net->lpn_peer->lp_nnis <= 1) {
+ attempt_remote_resend = false;
+ if (!lnet_isrouter(lpni))
+ handle_remote_health = false;
+ }
+ /* Do not put my interfaces into peer NI recovery. They should
+ * be handled with local NI recovery.
+ */
+ if (handle_remote_health && lpni &&
+ lnet_nid_to_ni_locked(&lpni->lpni_nid, 0))
+ handle_remote_health = false;
+ lnet_net_unlock(0);
+ }
+
switch (hstatus) {
case LNET_MSG_STATUS_OK:
/*
- * increment the local ni health weather we successfully
+ * increment the local ni health whether we successfully
* received or sent a message on it.
+ *
+ * Ping counts are reset to 0 as appropriate to allow for
+ * faster recovery.
*/
- lnet_inc_healthv(&ni->ni_healthv);
+ lnet_inc_healthv(&ni->ni_healthv, lnet_health_sensitivity);
/*
* It's possible msg_txpeer is NULL in the LOLND
* case. Only increment the peer's health if we're
* as indication that the router is fully healthy.
*/
if (lpni && msg->msg_rx_committed) {
+ lnet_net_lock(0);
+ lpni->lpni_ping_count = 0;
+ ni->ni_ping_count = 0;
/*
* If we're receiving a message from the router or
* I'm a router, then set that lpni's health to
* maximum so we can commence communication
*/
- lnet_net_lock(0);
if (lnet_isrouter(lpni) || the_lnet.ln_routing) {
lnet_set_lpni_healthv_locked(lpni,
LNET_MAX_HEALTH_VALUE);
} else {
- lnet_inc_lpni_healthv_locked(lpni);
+ __u32 sensitivity = lpni->lpni_peer_net->
+ lpn_peer->lp_health_sensitivity;
+
+ lnet_inc_lpni_healthv_locked(lpni,
+ (sensitivity) ? sensitivity :
+ lnet_health_sensitivity);
+ /* This peer NI may have previously aged out
+ * of recovery. Now that we've received a
+ * message from it, we can continue recovery
+ * if its health value is still below the
+ * maximum.
+ */
+ lnet_peer_ni_add_to_recoveryq_locked(lpni,
+ &the_lnet.ln_mt_peerNIRecovq,
+ ktime_get_seconds());
}
lnet_net_unlock(0);
}
case LNET_MSG_STATUS_LOCAL_ABORTED:
case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
case LNET_MSG_STATUS_LOCAL_TIMEOUT:
- lnet_handle_local_failure(ni);
- if (msg->msg_tx_committed)
- /* add to the re-send queue */
+ if (handle_local_health)
+ lnet_handle_local_failure(ni);
+ if (attempt_local_resend)
return lnet_attempt_msg_resend(msg);
break;
-
- /*
- * These errors will not trigger a resend so simply
- * finalize the message
- */
case LNET_MSG_STATUS_LOCAL_ERROR:
- lnet_handle_local_failure(ni);
+ if (handle_local_health)
+ lnet_handle_local_failure(ni);
return -1;
-
- /*
- * TODO: since the remote dropped the message we can
- * attempt a resend safely.
- */
case LNET_MSG_STATUS_REMOTE_DROPPED:
- lnet_handle_remote_failure(lpni);
- if (msg->msg_tx_committed)
+ if (handle_remote_health)
+ lnet_handle_remote_failure(lpni);
+ if (attempt_remote_resend)
return lnet_attempt_msg_resend(msg);
break;
-
case LNET_MSG_STATUS_REMOTE_ERROR:
case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+ if (handle_remote_health)
+ lnet_handle_remote_failure(lpni);
+ return -1;
case LNET_MSG_STATUS_NETWORK_TIMEOUT:
- lnet_handle_remote_failure(lpni);
+ if (handle_remote_health)
+ lnet_handle_remote_failure(lpni);
+ if (handle_local_health)
+ lnet_handle_local_failure(ni);
return -1;
default:
LBUG();
}
static void
-lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
+lnet_msg_detach_md(struct lnet_msg *msg, int status)
{
struct lnet_libmd *md = msg->msg_md;
+ lnet_handler_t handler = NULL;
+ int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
int unlink;
+ lnet_res_lock(cpt);
+ while (md->md_flags & LNET_MD_FLAG_HANDLING)
+ /* An event handler is running - wait for it to
+ * complete to avoid races.
+ */
+ lnet_md_wait_handling(md, cpt);
+
/* Now it's safe to drop my caller's ref */
md->md_refcount--;
LASSERT(md->md_refcount >= 0);
unlink = lnet_md_unlinkable(md);
- if (md->md_eq != NULL) {
+ if (md->md_handler) {
if ((md->md_flags & LNET_MD_FLAG_ABORTED) && !status) {
msg->msg_ev.status = -ETIMEDOUT;
CDEBUG(D_NET, "md 0x%p already unlinked\n", md);
msg->msg_ev.status = status;
}
msg->msg_ev.unlinked = unlink;
- lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
+ handler = md->md_handler;
+ if (!unlink)
+ md->md_flags |= LNET_MD_FLAG_HANDLING;
}
if (unlink || (md->md_refcount == 0 &&
md->md_threshold == LNET_MD_THRESH_INF))
lnet_detach_rsp_tracker(md, cpt);
+ msg->msg_md = NULL;
if (unlink)
lnet_md_unlink(md);
- msg->msg_md = NULL;
+ lnet_res_unlock(cpt);
+
+ if (handler) {
+ handler(&msg->msg_ev);
+ if (!unlink) {
+ lnet_res_lock(cpt);
+ md->md_flags &= ~LNET_MD_FLAG_HANDLING;
+ wake_up_var(md);
+ lnet_res_unlock(cpt);
+ }
+ }
}
static bool
return false;
/* match only health rules */
- if (!lnet_drop_rule_match(&msg->msg_hdr, LNET_NID_ANY,
- hstatus))
+ if (!lnet_drop_rule_match(&msg->msg_hdr, NULL, hstatus))
return false;
CDEBUG(D_NET, "src %s(%s)->dst %s: %s simulate health error: %s\n",
- libcfs_nid2str(msg->msg_hdr.src_nid),
- libcfs_nid2str(msg->msg_txni->ni_nid),
- libcfs_nid2str(msg->msg_hdr.dest_nid),
+ libcfs_nidstr(&msg->msg_hdr.src_nid),
+ libcfs_nidstr(&msg->msg_txni->ni_nid),
+ libcfs_nidstr(&msg->msg_hdr.dest_nid),
lnet_msgtyp2str(msg->msg_type),
lnet_health_error2str(*hstatus));
* We're not going to resend this message so detach its MD and invoke
* the appropriate callbacks
*/
- if (msg->msg_md != NULL) {
- cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
- lnet_res_lock(cpt);
- lnet_msg_detach_md(msg, cpt, status);
- lnet_res_unlock(cpt);
- }
+ if (msg->msg_md != NULL)
+ lnet_msg_detach_md(msg, status);
again:
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
}
rc = 0;
- while (!list_empty(&container->msc_finalizing)) {
- msg = list_entry(container->msc_finalizing.next,
- struct lnet_msg, msg_list);
-
+ while ((msg = list_first_entry_or_null(&container->msc_finalizing,
+ struct lnet_msg,
+ msg_list)) != NULL) {
list_del_init(&msg->msg_list);
/* NB drops and regains the lnet lock if it actually does
void
lnet_msg_container_cleanup(struct lnet_msg_container *container)
{
- int count = 0;
+ struct lnet_msg *msg;
+ int count = 0;
if (container->msc_init == 0)
return;
- while (!list_empty(&container->msc_active)) {
- struct lnet_msg *msg;
-
- msg = list_entry(container->msc_active.next,
- struct lnet_msg, msg_activelist);
+ while ((msg = list_first_entry_or_null(&container->msc_active,
+ struct lnet_msg,
+ msg_activelist)) != NULL) {
LASSERT(msg->msg_onactivelist);
msg->msg_onactivelist = 0;
list_del_init(&msg->msg_activelist);
CERROR("%d active msg on exit\n", count);
if (container->msc_finalizers != NULL) {
- LIBCFS_FREE(container->msc_finalizers,
- container->msc_nfinalizers *
- sizeof(*container->msc_finalizers));
+ CFS_FREE_PTR_ARRAY(container->msc_finalizers,
+ container->msc_nfinalizers);
container->msc_finalizers = NULL;
}
if (container->msc_resenders != NULL) {
- LIBCFS_FREE(container->msc_resenders,
- container->msc_nfinalizers *
- sizeof(*container->msc_resenders));
+ CFS_FREE_PTR_ARRAY(container->msc_resenders,
+ container->msc_nfinalizers);
container->msc_resenders = NULL;
}
container->msc_init = 0;