X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=69a85be4977d73ca57da9ed96ea6f4ed256888da;hb=3b760208109b249fd9051d97dbc98664ca4b5769;hp=fcf021ac443fc36761336d43fc9d3d61e5f4350e;hpb=b81bcc6c6f0c54c48e908eccb13adc620582881e;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index fcf021a..69a85be 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * lnet/lnet/lib-msg.c * @@ -48,7 +47,7 @@ lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev) ev->status = 0; ev->unlinked = 1; ev->type = LNET_EVENT_UNLINK; - lnet_md_deconstruct(md, &ev->md); + lnet_md_deconstruct(md, ev); lnet_md2handle(&ev->md_handle, md); EXIT; } @@ -69,13 +68,13 @@ lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type) if (ev_type == LNET_EVENT_SEND) { /* event for active message */ - ev->target.nid = le64_to_cpu(hdr->dest_nid); - ev->target.pid = le32_to_cpu(hdr->dest_pid); - ev->initiator.nid = LNET_NID_ANY; + ev->target.nid = hdr->dest_nid; + ev->target.pid = hdr->dest_pid; + ev->initiator.nid = LNET_ANY_NID; ev->initiator.pid = the_lnet.ln_pid; - ev->source.nid = LNET_NID_ANY; + ev->source.nid = LNET_ANY_NID; ev->source.pid = the_lnet.ln_pid; - ev->sender = LNET_NID_ANY; + ev->sender = LNET_ANY_NID; } else { /* event for passive message */ ev->target.pid = hdr->dest_pid; @@ -86,7 +85,7 @@ lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type) /* Multi-Rail: track source NID. */ ev->source.pid = hdr->src_pid; ev->source.nid = hdr->src_nid; - ev->rlength = hdr->payload_length; + ev->rlength = hdr->payload_length; ev->sender = msg->msg_from; ev->mlength = msg->msg_wanted; ev->offset = msg->msg_offset; @@ -360,7 +359,7 @@ lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md, /* build umd in event */ lnet_md2handle(&msg->msg_ev.md_handle, md); - lnet_md_deconstruct(md, &msg->msg_ev.md); + lnet_md_deconstruct(md, &msg->msg_ev); } static int @@ -385,15 +384,14 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt) ack_wmd = msg->msg_hdr.msg.put.ack_wmd; - lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0); + lnet_prep_send(msg, LNET_MSG_ACK, &msg->msg_ev.source, 0, 0); msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); - /* NB: we probably want to use NID of msg::msg_from as 3rd - * parameter (router NID) if it's routed message */ - rc = lnet_send(msg->msg_ev.target.nid, msg, msg->msg_from); + rc = lnet_send(&msg->msg_ev.target.nid, msg, + &msg->msg_from); lnet_net_lock(cpt); /* @@ -415,7 +413,7 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt) LASSERT(!msg->msg_receiving); /* called back recv already */ lnet_net_unlock(cpt); - rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY); + rc = lnet_send(NULL, msg, NULL); lnet_net_lock(cpt); /* @@ -452,6 +450,31 @@ lnet_dec_healthv_locked(atomic_t *healthv, int sensitivity) } } +/* must hold net_lock/0 */ +void +lnet_ni_add_to_recoveryq_locked(struct lnet_ni *ni, + struct list_head *recovery_queue, time64_t now) +{ + if (!list_empty(&ni->ni_recovery)) + return; + + if (atomic_read(&ni->ni_healthv) == LNET_MAX_HEALTH_VALUE) + return; + + /* This NI is going on the recovery queue, so take a ref on it */ + lnet_ni_addref_locked(ni, 0); + + lnet_ni_set_next_ping(ni, now); + + CDEBUG(D_NET, "%s added to recovery queue. ping count: %u next ping: %lld health :%d\n", + libcfs_nidstr(&ni->ni_nid), + ni->ni_ping_count, + ni->ni_next_ping, + atomic_read(&ni->ni_healthv)); + + list_add_tail(&ni->ni_recovery, recovery_queue); +} + static void lnet_handle_local_failure(struct lnet_ni *local_ni) { @@ -467,35 +490,18 @@ lnet_handle_local_failure(struct lnet_ni *local_ni) } lnet_dec_healthv_locked(&local_ni->ni_healthv, lnet_health_sensitivity); - /* - * add the NI to the recovery queue if it's not already there - * and it's health value is actually below the maximum. It's - * possible that the sensitivity might be set to 0, and the health - * value will not be reduced. In this case, there is no reason to - * invoke recovery - */ - if (list_empty(&local_ni->ni_recovery) && - atomic_read(&local_ni->ni_healthv) < LNET_MAX_HEALTH_VALUE) { - CERROR("ni %s added to recovery queue. Health = %d\n", - libcfs_nid2str(local_ni->ni_nid), - atomic_read(&local_ni->ni_healthv)); - list_add_tail(&local_ni->ni_recovery, - &the_lnet.ln_mt_localNIRecovq); - lnet_ni_addref_locked(local_ni, 0); - } + lnet_ni_add_to_recoveryq_locked(local_ni, &the_lnet.ln_mt_localNIRecovq, + ktime_get_seconds()); lnet_net_unlock(0); } +/* must hold net_lock/0 */ void lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni) { __u32 sensitivity = lnet_health_sensitivity; __u32 lp_sensitivity; - /* lpni could be NULL if we're in the LOLND case */ - if (!lpni) - return; - /* * If there is a health sensitivity in the peer then use that * instead of the globally set one. @@ -505,6 +511,10 @@ lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni) sensitivity = lp_sensitivity; lnet_dec_healthv_locked(&lpni->lpni_healthv, sensitivity); + + /* update the peer_net's health value */ + lnet_update_peer_net_healthv(lpni); + /* * add the peer NI to the recovery queue if it's not already there * and it's health value is actually below the maximum. It's @@ -512,7 +522,9 @@ lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni) * value will not be reduced. In this case, there is no reason to * invoke recovery */ - lnet_peer_ni_add_to_recoveryq_locked(lpni); + lnet_peer_ni_add_to_recoveryq_locked(lpni, + &the_lnet.ln_mt_peerNIRecovq, + ktime_get_seconds()); } static void @@ -533,10 +545,9 @@ lnet_handle_remote_failure(struct lnet_peer_ni *lpni) } static void -lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus) +lnet_incr_hstats(struct lnet_ni *ni, struct lnet_peer_ni *lpni, + enum lnet_msg_hstatus hstatus) { - struct lnet_ni *ni = msg->msg_txni; - struct lnet_peer_ni *lpni = msg->msg_txpeer; struct lnet_counters_health *health; health = &the_lnet.ln_counters[0]->lct_health; @@ -630,8 +641,8 @@ lnet_resend_msg_locked(struct lnet_msg *msg) msg->msg_target_is_router = 0; CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n", - libcfs_nid2str(msg->msg_hdr.src_nid), - libcfs_nid2str(msg->msg_hdr.dest_nid), + libcfs_nidstr(&msg->msg_hdr.src_nid), + libcfs_nidstr(&msg->msg_hdr.dest_nid), lnet_msgtyp2str(msg->msg_type), lnet_health_error2str(msg->msg_health_status), msg); @@ -679,8 +690,8 @@ lnet_attempt_msg_resend(struct lnet_msg *msg) /* don't resend recovery messages */ if (msg->msg_recovery) { CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), + libcfs_nidstr(&msg->msg_from), + libcfs_nidstr(&msg->msg_target.nid), msg->msg_retry_count); return -ENOTRECOVERABLE; } @@ -691,8 +702,8 @@ lnet_attempt_msg_resend(struct lnet_msg *msg) */ if (msg->msg_no_resend) { CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), + libcfs_nidstr(&msg->msg_from), + libcfs_nidstr(&msg->msg_target.nid), msg->msg_retry_count); return -ENOTRECOVERABLE; } @@ -700,8 +711,8 @@ lnet_attempt_msg_resend(struct lnet_msg *msg) /* check if the message has exceeded the number of retries */ if (msg->msg_retry_count >= lnet_retry_count) { CNETERR("msg %s->%s exceeded retry count %d\n", - libcfs_nid2str(msg->msg_from), - libcfs_nid2str(msg->msg_target.nid), + libcfs_nidstr(&msg->msg_from), + libcfs_nidstr(&msg->msg_target.nid), msg->msg_retry_count); return -ENOTRECOVERABLE; } @@ -728,9 +739,9 @@ lnet_attempt_msg_resend(struct lnet_msg *msg) return 0; } - while (!list_empty(&container->msc_resending)) { - msg = list_entry(container->msc_resending.next, - struct lnet_msg, msg_list); + while ((msg = list_first_entry_or_null(&container->msc_resending, + struct lnet_msg, + msg_list)) != NULL) { list_del(&msg->msg_list); /* @@ -775,6 +786,10 @@ lnet_health_check(struct lnet_msg *msg) struct lnet_peer_ni *lpni; struct lnet_ni *ni; bool lo = false; + bool attempt_local_resend; + bool attempt_remote_resend; + bool handle_local_health; + bool handle_remote_health; /* if we're shutting down no point in handling health. */ if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) @@ -787,10 +802,10 @@ lnet_health_check(struct lnet_msg *msg) * set. So no need to sanity check it. */ if (msg->msg_tx_committed && - LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND) + !nid_is_lo0(&msg->msg_txni->ni_nid)) LASSERT(msg->msg_txpeer); else if (msg->msg_tx_committed && - LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) == LOLND) + nid_is_lo0(&msg->msg_txni->ni_nid)) lo = true; if (hstatus != LNET_MSG_STATUS_OK && @@ -798,25 +813,17 @@ lnet_health_check(struct lnet_msg *msg) return -1; /* - * stats are only incremented for errors so avoid wasting time - * incrementing statistics if there is no error. - */ - if (hstatus != LNET_MSG_STATUS_OK) { - lnet_net_lock(0); - lnet_incr_hstats(msg, hstatus); - lnet_net_unlock(0); - } - - /* * always prefer txni/txpeer if they message is committed for both * directions. */ if (msg->msg_tx_committed) { ni = msg->msg_txni; lpni = msg->msg_txpeer; + attempt_local_resend = attempt_remote_resend = true; } else { ni = msg->msg_rxni; lpni = msg->msg_rxpeer; + attempt_local_resend = attempt_remote_resend = false; } if (!lo) @@ -825,18 +832,71 @@ lnet_health_check(struct lnet_msg *msg) LASSERT(ni); CDEBUG(D_NET, "health check: %s->%s: %s: %s\n", - libcfs_nid2str(ni->ni_nid), - (lo) ? "self" : libcfs_nid2str(lpni->lpni_nid), + libcfs_nidstr(&ni->ni_nid), + (lo) ? "self" : libcfs_nidstr(&lpni->lpni_nid), lnet_msgtyp2str(msg->msg_type), lnet_health_error2str(hstatus)); + /* + * stats are only incremented for errors so avoid wasting time + * incrementing statistics if there is no error. Similarly, whether to + * update health values or perform resends is only applicable for + * messages with a health status != OK. + */ + if (hstatus != LNET_MSG_STATUS_OK) { + /* Don't further decrement the health value if a recovery + * message failed. + */ + if (msg->msg_recovery) + handle_local_health = handle_remote_health = false; + else + handle_local_health = handle_remote_health = true; + + /* For local failures, health/recovery/resends are not needed if + * I only have a single (non-lolnd) interface. NB: pb_nnis + * includes the lolnd interface, so a single-rail node would + * have pb_nnis == 2. + */ + if (the_lnet.ln_ping_target->pb_nnis <= 2) { + handle_local_health = false; + attempt_local_resend = false; + } + + lnet_net_lock(0); + lnet_incr_hstats(ni, lpni, hstatus); + /* For remote failures, health/recovery/resends are not needed + * if the peer only has a single interface. Special case for + * routers where we rely on health feature to manage route + * aliveness. NB: unlike pb_nnis above, lp_nnis does _not_ + * include the lolnd, so a single-rail node would have + * lp_nnis == 1. + */ + if (lpni && lpni->lpni_peer_net && + lpni->lpni_peer_net->lpn_peer && + lpni->lpni_peer_net->lpn_peer->lp_nnis <= 1) { + attempt_remote_resend = false; + if (!lnet_isrouter(lpni)) + handle_remote_health = false; + } + /* Do not put my interfaces into peer NI recovery. They should + * be handled with local NI recovery. + */ + if (handle_remote_health && lpni && + lnet_nid_to_ni_locked(&lpni->lpni_nid, 0)) + handle_remote_health = false; + lnet_net_unlock(0); + } + switch (hstatus) { case LNET_MSG_STATUS_OK: /* - * increment the local ni health weather we successfully + * increment the local ni health whether we successfully * received or sent a message on it. + * + * Ping counts are reset to 0 as appropriate to allow for + * faster recovery. */ - lnet_inc_healthv(&ni->ni_healthv); + lnet_inc_healthv(&ni->ni_healthv, lnet_health_sensitivity); /* * It's possible msg_txpeer is NULL in the LOLND * case. Only increment the peer's health if we're @@ -846,16 +906,35 @@ lnet_health_check(struct lnet_msg *msg) * as indication that the router is fully healthy. */ if (lpni && msg->msg_rx_committed) { + lnet_net_lock(0); + lpni->lpni_ping_count = 0; + ni->ni_ping_count = 0; /* * If we're receiving a message from the router or * I'm a router, then set that lpni's health to * maximum so we can commence communication */ - if (lnet_isrouter(lpni) || the_lnet.ln_routing) - lnet_set_healthv(&lpni->lpni_healthv, - LNET_MAX_HEALTH_VALUE); - else - lnet_inc_healthv(&lpni->lpni_healthv); + if (lnet_isrouter(lpni) || the_lnet.ln_routing) { + lnet_set_lpni_healthv_locked(lpni, + LNET_MAX_HEALTH_VALUE); + } else { + __u32 sensitivity = lpni->lpni_peer_net-> + lpn_peer->lp_health_sensitivity; + + lnet_inc_lpni_healthv_locked(lpni, + (sensitivity) ? sensitivity : + lnet_health_sensitivity); + /* This peer NI may have previously aged out + * of recovery. Now that we've received a + * message from it, we can continue recovery + * if its health value is still below the + * maximum. + */ + lnet_peer_ni_add_to_recoveryq_locked(lpni, + &the_lnet.ln_mt_peerNIRecovq, + ktime_get_seconds()); + } + lnet_net_unlock(0); } /* we can finalize this message */ @@ -865,34 +944,31 @@ lnet_health_check(struct lnet_msg *msg) case LNET_MSG_STATUS_LOCAL_ABORTED: case LNET_MSG_STATUS_LOCAL_NO_ROUTE: case LNET_MSG_STATUS_LOCAL_TIMEOUT: - lnet_handle_local_failure(ni); - if (msg->msg_tx_committed) - /* add to the re-send queue */ + if (handle_local_health) + lnet_handle_local_failure(ni); + if (attempt_local_resend) return lnet_attempt_msg_resend(msg); break; - - /* - * These errors will not trigger a resend so simply - * finalize the message - */ case LNET_MSG_STATUS_LOCAL_ERROR: - lnet_handle_local_failure(ni); + if (handle_local_health) + lnet_handle_local_failure(ni); return -1; - - /* - * TODO: since the remote dropped the message we can - * attempt a resend safely. - */ case LNET_MSG_STATUS_REMOTE_DROPPED: - lnet_handle_remote_failure(lpni); - if (msg->msg_tx_committed) + if (handle_remote_health) + lnet_handle_remote_failure(lpni); + if (attempt_remote_resend) return lnet_attempt_msg_resend(msg); break; - case LNET_MSG_STATUS_REMOTE_ERROR: case LNET_MSG_STATUS_REMOTE_TIMEOUT: + if (handle_remote_health) + lnet_handle_remote_failure(lpni); + return -1; case LNET_MSG_STATUS_NETWORK_TIMEOUT: - lnet_handle_remote_failure(lpni); + if (handle_remote_health) + lnet_handle_remote_failure(lpni); + if (handle_local_health) + lnet_handle_local_failure(ni); return -1; default: LBUG(); @@ -903,17 +979,26 @@ lnet_health_check(struct lnet_msg *msg) } static void -lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status) +lnet_msg_detach_md(struct lnet_msg *msg, int status) { struct lnet_libmd *md = msg->msg_md; + lnet_handler_t handler = NULL; + int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie); int unlink; + lnet_res_lock(cpt); + while (md->md_flags & LNET_MD_FLAG_HANDLING) + /* An event handler is running - wait for it to + * complete to avoid races. + */ + lnet_md_wait_handling(md, cpt); + /* Now it's safe to drop my caller's ref */ md->md_refcount--; LASSERT(md->md_refcount >= 0); unlink = lnet_md_unlinkable(md); - if (md->md_eq != NULL) { + if (md->md_handler) { if ((md->md_flags & LNET_MD_FLAG_ABORTED) && !status) { msg->msg_ev.status = -ETIMEDOUT; CDEBUG(D_NET, "md 0x%p already unlinked\n", md); @@ -921,17 +1006,30 @@ lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status) msg->msg_ev.status = status; } msg->msg_ev.unlinked = unlink; - lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); + handler = md->md_handler; + if (!unlink) + md->md_flags |= LNET_MD_FLAG_HANDLING; } if (unlink || (md->md_refcount == 0 && md->md_threshold == LNET_MD_THRESH_INF)) lnet_detach_rsp_tracker(md, cpt); + msg->msg_md = NULL; if (unlink) lnet_md_unlink(md); - msg->msg_md = NULL; + lnet_res_unlock(cpt); + + if (handler) { + handler(&msg->msg_ev); + if (!unlink) { + lnet_res_lock(cpt); + md->md_flags &= ~LNET_MD_FLAG_HANDLING; + wake_up_var(md); + lnet_res_unlock(cpt); + } + } } static bool @@ -1016,14 +1114,13 @@ lnet_send_error_simulation(struct lnet_msg *msg, return false; /* match only health rules */ - if (!lnet_drop_rule_match(&msg->msg_hdr, LNET_NID_ANY, - hstatus)) + if (!lnet_drop_rule_match(&msg->msg_hdr, NULL, hstatus)) return false; CDEBUG(D_NET, "src %s(%s)->dst %s: %s simulate health error: %s\n", - libcfs_nid2str(msg->msg_hdr.src_nid), - libcfs_nid2str(msg->msg_txni->ni_nid), - libcfs_nid2str(msg->msg_hdr.dest_nid), + libcfs_nidstr(&msg->msg_hdr.src_nid), + libcfs_nidstr(&msg->msg_txni->ni_nid), + libcfs_nidstr(&msg->msg_hdr.dest_nid), lnet_msgtyp2str(msg->msg_type), lnet_health_error2str(*hstatus)); @@ -1067,12 +1164,8 @@ lnet_finalize(struct lnet_msg *msg, int status) * We're not going to resend this message so detach its MD and invoke * the appropriate callbacks */ - if (msg->msg_md != NULL) { - cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); - lnet_res_lock(cpt); - lnet_msg_detach_md(msg, cpt, status); - lnet_res_unlock(cpt); - } + if (msg->msg_md != NULL) + lnet_msg_detach_md(msg, status); again: if (!msg->msg_tx_committed && !msg->msg_rx_committed) { @@ -1106,10 +1199,9 @@ again: } rc = 0; - while (!list_empty(&container->msc_finalizing)) { - msg = list_entry(container->msc_finalizing.next, - struct lnet_msg, msg_list); - + while ((msg = list_first_entry_or_null(&container->msc_finalizing, + struct lnet_msg, + msg_list)) != NULL) { list_del_init(&msg->msg_list); /* NB drops and regains the lnet lock if it actually does @@ -1136,16 +1228,15 @@ EXPORT_SYMBOL(lnet_finalize); void lnet_msg_container_cleanup(struct lnet_msg_container *container) { - int count = 0; + struct lnet_msg *msg; + int count = 0; if (container->msc_init == 0) return; - while (!list_empty(&container->msc_active)) { - struct lnet_msg *msg; - - msg = list_entry(container->msc_active.next, - struct lnet_msg, msg_activelist); + while ((msg = list_first_entry_or_null(&container->msc_active, + struct lnet_msg, + msg_activelist)) != NULL) { LASSERT(msg->msg_onactivelist); msg->msg_onactivelist = 0; list_del_init(&msg->msg_activelist); @@ -1157,16 +1248,14 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) CERROR("%d active msg on exit\n", count); if (container->msc_finalizers != NULL) { - LIBCFS_FREE(container->msc_finalizers, - container->msc_nfinalizers * - sizeof(*container->msc_finalizers)); + CFS_FREE_PTR_ARRAY(container->msc_finalizers, + container->msc_nfinalizers); container->msc_finalizers = NULL; } if (container->msc_resenders != NULL) { - LIBCFS_FREE(container->msc_resenders, - container->msc_nfinalizers * - sizeof(*container->msc_resenders)); + CFS_FREE_PTR_ARRAY(container->msc_resenders, + container->msc_nfinalizers); container->msc_resenders = NULL; } container->msc_init = 0;