X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=8ab819416320b0e7f72203ab34a20b8e137d0383;hp=5f988bef5bf4efee78a2e783b2d14750636add29;hb=c5381d73b1d83deed561456bc476d63696b9af16;hpb=15020fd977af68620e862ad999eaab17688933e2 diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 5f988be..8ab8194 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -48,7 +48,7 @@ lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev) ev->status = 0; ev->unlinked = 1; ev->type = LNET_EVENT_UNLINK; - lnet_md_deconstruct(md, &ev->md); + lnet_md_deconstruct(md, ev); lnet_md2handle(&ev->md_handle, md); EXIT; } @@ -142,7 +142,7 @@ void lnet_msg_commit(struct lnet_msg *msg, int cpt) { struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt]; - struct lnet_counters *counters = the_lnet.ln_counters[cpt]; + struct lnet_counters_common *common; s64 timeout_ns; /* set the message deadline */ @@ -171,30 +171,31 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt) msg->msg_onactivelist = 1; list_add_tail(&msg->msg_activelist, &container->msc_active); - counters->msgs_alloc++; - if (counters->msgs_alloc > counters->msgs_max) - counters->msgs_max = counters->msgs_alloc; + common = &the_lnet.ln_counters[cpt]->lct_common; + common->lcc_msgs_alloc++; + if (common->lcc_msgs_alloc > common->lcc_msgs_max) + common->lcc_msgs_max = common->lcc_msgs_alloc; } static void lnet_msg_decommit_tx(struct lnet_msg *msg, int status) { - struct lnet_counters *counters; + struct lnet_counters_common *common; struct lnet_event *ev = &msg->msg_ev; LASSERT(msg->msg_tx_committed); if (status != 0) goto out; - counters = the_lnet.ln_counters[msg->msg_tx_cpt]; + common = &(the_lnet.ln_counters[msg->msg_tx_cpt]->lct_common); switch (ev->type) { default: /* routed message */ LASSERT(msg->msg_routing); LASSERT(msg->msg_rx_committed); LASSERT(ev->type == 0); - counters->route_length += msg->msg_len; - counters->route_count++; + common->lcc_route_length += msg->msg_len; + common->lcc_route_count++; goto incr_stats; case LNET_EVENT_PUT: @@ -208,7 +209,7 @@ lnet_msg_decommit_tx(struct lnet_msg *msg, int status) case LNET_EVENT_SEND: LASSERT(!msg->msg_rx_committed); if (msg->msg_type == LNET_MSG_PUT) - counters->send_length += msg->msg_len; + common->lcc_send_length += msg->msg_len; break; case LNET_EVENT_GET: @@ -220,7 +221,7 @@ lnet_msg_decommit_tx(struct lnet_msg *msg, int status) break; } - counters->send_count++; + common->lcc_send_count++; incr_stats: if (msg->msg_txpeer) @@ -239,7 +240,7 @@ incr_stats: static void lnet_msg_decommit_rx(struct lnet_msg *msg, int status) { - struct lnet_counters *counters; + struct lnet_counters_common *common; struct lnet_event *ev = &msg->msg_ev; LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */ @@ -248,7 +249,7 @@ lnet_msg_decommit_rx(struct lnet_msg *msg, int status) if (status != 0) goto out; - counters = the_lnet.ln_counters[msg->msg_rx_cpt]; + common = &(the_lnet.ln_counters[msg->msg_rx_cpt]->lct_common); switch (ev->type) { default: LASSERT(ev->type == 0); @@ -266,7 +267,7 @@ lnet_msg_decommit_rx(struct lnet_msg *msg, int status) * lnet_msg_decommit_tx(), see details in lnet_parse_get() */ LASSERT(msg->msg_type == LNET_MSG_REPLY || msg->msg_type == LNET_MSG_GET); - counters->send_length += msg->msg_wanted; + common->lcc_send_length += msg->msg_wanted; break; case LNET_EVENT_PUT: @@ -281,7 +282,7 @@ lnet_msg_decommit_rx(struct lnet_msg *msg, int status) break; } - counters->recv_count++; + common->lcc_recv_count++; incr_stats: if (msg->msg_rxpeer) @@ -293,7 +294,7 @@ incr_stats: msg->msg_type, LNET_STATS_TYPE_RECV); if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) - counters->recv_length += msg->msg_wanted; + common->lcc_recv_length += msg->msg_wanted; out: lnet_return_rx_credits_locked(msg); @@ -326,7 +327,7 @@ lnet_msg_decommit(struct lnet_msg *msg, int cpt, int status) list_del(&msg->msg_activelist); msg->msg_onactivelist = 0; - the_lnet.ln_counters[cpt2]->msgs_alloc--; + the_lnet.ln_counters[cpt2]->lct_common.lcc_msgs_alloc--; if (cpt2 != cpt) { lnet_net_unlock(cpt2); @@ -359,30 +360,7 @@ lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md, /* build umd in event */ lnet_md2handle(&msg->msg_ev.md_handle, md); - lnet_md_deconstruct(md, &msg->msg_ev.md); -} - -void -lnet_msg_detach_md(struct lnet_msg *msg, int status) -{ - struct lnet_libmd *md = msg->msg_md; - int unlink; - - /* Now it's safe to drop my caller's ref */ - md->md_refcount--; - LASSERT(md->md_refcount >= 0); - - unlink = lnet_md_unlinkable(md); - if (md->md_eq != NULL) { - msg->msg_ev.status = status; - msg->msg_ev.unlinked = unlink; - lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); - } - - if (unlink) - lnet_md_unlink(md); - - msg->msg_md = NULL; + lnet_md_deconstruct(md, &msg->msg_ev); } static int @@ -413,9 +391,7 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt) msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); - /* NB: we probably want to use NID of msg::msg_from as 3rd - * parameter (router NID) if it's routed message */ - rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY); + rc = lnet_send(msg->msg_ev.target.nid, msg, msg->msg_from); lnet_net_lock(cpt); /* @@ -462,25 +438,21 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt) } static void -lnet_dec_healthv_locked(atomic_t *healthv) +lnet_dec_healthv_locked(atomic_t *healthv, int sensitivity) { int h = atomic_read(healthv); - if (h < lnet_health_sensitivity) { + if (h < sensitivity) { atomic_set(healthv, 0); } else { - h -= lnet_health_sensitivity; + h -= sensitivity; atomic_set(healthv, h); } } static void -lnet_handle_local_failure(struct lnet_msg *msg) +lnet_handle_local_failure(struct lnet_ni *local_ni) { - struct lnet_ni *local_ni; - - local_ni = msg->msg_txni; - /* * the lnet_net_lock(0) is used to protect the addref on the ni * and the recovery queue. @@ -492,7 +464,7 @@ lnet_handle_local_failure(struct lnet_msg *msg) return; } - lnet_dec_healthv_locked(&local_ni->ni_healthv); + lnet_dec_healthv_locked(&local_ni->ni_healthv, lnet_health_sensitivity); /* * add the NI to the recovery queue if it's not already there * and it's health value is actually below the maximum. It's @@ -502,7 +474,7 @@ lnet_handle_local_failure(struct lnet_msg *msg) */ if (list_empty(&local_ni->ni_recovery) && atomic_read(&local_ni->ni_healthv) < LNET_MAX_HEALTH_VALUE) { - CERROR("ni %s added to recovery queue. Health = %d\n", + CDEBUG(D_NET, "ni %s added to recovery queue. Health = %d\n", libcfs_nid2str(local_ni->ni_nid), atomic_read(&local_ni->ni_healthv)); list_add_tail(&local_ni->ni_recovery, @@ -512,19 +484,33 @@ lnet_handle_local_failure(struct lnet_msg *msg) lnet_net_unlock(0); } -static void -lnet_handle_remote_failure(struct lnet_msg *msg) +void +lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni) { - struct lnet_peer_ni *lpni; + __u32 sensitivity = lnet_health_sensitivity; + __u32 lp_sensitivity; - lpni = msg->msg_txpeer; - - /* lpni could be NULL if we're in the LOLND case */ + /* + * NO-OP if: + * 1. lpni could be NULL if we're in the LOLND case + * 2. this is a recovery message + */ if (!lpni) return; - lnet_net_lock(0); - lnet_dec_healthv_locked(&lpni->lpni_healthv); + /* + * If there is a health sensitivity in the peer then use that + * instead of the globally set one. + */ + lp_sensitivity = lpni->lpni_peer_net->lpn_peer->lp_health_sensitivity; + if (lp_sensitivity) + sensitivity = lp_sensitivity; + + lnet_dec_healthv_locked(&lpni->lpni_healthv, sensitivity); + + /* update the peer_net's health value */ + lnet_update_peer_net_healthv(lpni); + /* * add the peer NI to the recovery queue if it's not already there * and it's health value is actually below the maximum. It's @@ -533,60 +519,77 @@ lnet_handle_remote_failure(struct lnet_msg *msg) * invoke recovery */ lnet_peer_ni_add_to_recoveryq_locked(lpni); +} + +static void +lnet_handle_remote_failure(struct lnet_peer_ni *lpni) +{ + /* lpni could be NULL if we're in the LOLND case */ + if (!lpni) + return; + + lnet_net_lock(0); + /* the mt could've shutdown and cleaned up the queues */ + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { + lnet_net_unlock(0); + return; + } + lnet_handle_remote_failure_locked(lpni); lnet_net_unlock(0); } static void -lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus) +lnet_incr_hstats(struct lnet_ni *ni, struct lnet_peer_ni *lpni, + enum lnet_msg_hstatus hstatus) { - struct lnet_ni *ni = msg->msg_txni; - struct lnet_peer_ni *lpni = msg->msg_txpeer; - struct lnet_counters *counters = the_lnet.ln_counters[0]; + struct lnet_counters_health *health; + + health = &the_lnet.ln_counters[0]->lct_health; switch (hstatus) { case LNET_MSG_STATUS_LOCAL_INTERRUPT: atomic_inc(&ni->ni_hstats.hlt_local_interrupt); - counters->local_interrupt_count++; + health->lch_local_interrupt_count++; break; case LNET_MSG_STATUS_LOCAL_DROPPED: atomic_inc(&ni->ni_hstats.hlt_local_dropped); - counters->local_dropped_count++; + health->lch_local_dropped_count++; break; case LNET_MSG_STATUS_LOCAL_ABORTED: atomic_inc(&ni->ni_hstats.hlt_local_aborted); - counters->local_aborted_count++; + health->lch_local_aborted_count++; break; case LNET_MSG_STATUS_LOCAL_NO_ROUTE: atomic_inc(&ni->ni_hstats.hlt_local_no_route); - counters->local_no_route_count++; + health->lch_local_no_route_count++; break; case LNET_MSG_STATUS_LOCAL_TIMEOUT: atomic_inc(&ni->ni_hstats.hlt_local_timeout); - counters->local_timeout_count++; + health->lch_local_timeout_count++; break; case LNET_MSG_STATUS_LOCAL_ERROR: atomic_inc(&ni->ni_hstats.hlt_local_error); - counters->local_error_count++; + health->lch_local_error_count++; break; case LNET_MSG_STATUS_REMOTE_DROPPED: if (lpni) atomic_inc(&lpni->lpni_hstats.hlt_remote_dropped); - counters->remote_dropped_count++; + health->lch_remote_dropped_count++; break; case LNET_MSG_STATUS_REMOTE_ERROR: if (lpni) atomic_inc(&lpni->lpni_hstats.hlt_remote_error); - counters->remote_error_count++; + health->lch_remote_error_count++; break; case LNET_MSG_STATUS_REMOTE_TIMEOUT: if (lpni) atomic_inc(&lpni->lpni_hstats.hlt_remote_timeout); - counters->remote_timeout_count++; + health->lch_remote_timeout_count++; break; case LNET_MSG_STATUS_NETWORK_TIMEOUT: if (lpni) atomic_inc(&lpni->lpni_hstats.hlt_network_timeout); - counters->network_timeout_count++; + health->lch_network_timeout_count++; break; case LNET_MSG_STATUS_OK: break; @@ -595,6 +598,174 @@ lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus) } } +static void +lnet_resend_msg_locked(struct lnet_msg *msg) +{ + msg->msg_retry_count++; + + /* + * remove message from the active list and reset it to prepare + * for a resend. Two exceptions to this + * + * 1. the router case. When a message is being routed it is + * committed for rx when received and committed for tx when + * forwarded. We don't want to remove it from the active list, since + * code which handles receiving expects it to remain on the active + * list. + * + * 2. The REPLY case. Reply messages use the same message + * structure for the GET that was received. + */ + if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { + list_del_init(&msg->msg_activelist); + msg->msg_onactivelist = 0; + } + /* + * The msg_target.nid which was originally set + * when calling LNetGet() or LNetPut() might've + * been overwritten if we're routing this message. + * Call lnet_msg_decommit_tx() to return the credit + * this message consumed. The message will + * consume another credit when it gets resent. + */ + msg->msg_target.nid = msg->msg_hdr.dest_nid; + lnet_msg_decommit_tx(msg, -EAGAIN); + msg->msg_sending = 0; + msg->msg_receiving = 0; + msg->msg_target_is_router = 0; + + CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n", + libcfs_nid2str(msg->msg_hdr.src_nid), + libcfs_nid2str(msg->msg_hdr.dest_nid), + lnet_msgtyp2str(msg->msg_type), + lnet_health_error2str(msg->msg_health_status), msg); + + list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); + + complete(&the_lnet.ln_mt_wait_complete); +} + +int +lnet_check_finalize_recursion_locked(struct lnet_msg *msg, + struct list_head *containerq, + int nworkers, void **workers) +{ + int my_slot = -1; + int i; + + list_add_tail(&msg->msg_list, containerq); + + for (i = 0; i < nworkers; i++) { + if (workers[i] == current) + break; + + if (my_slot < 0 && workers[i] == NULL) + my_slot = i; + } + + if (i < nworkers || my_slot < 0) + return -1; + + workers[my_slot] = current; + + return my_slot; +} + +int +lnet_attempt_msg_resend(struct lnet_msg *msg) +{ + struct lnet_msg_container *container; + int my_slot; + int cpt; + + /* we can only resend tx_committed messages */ + LASSERT(msg->msg_tx_committed); + + /* don't resend recovery messages */ + if (msg->msg_recovery) { + CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + /* + * if we explicitly indicated we don't want to resend then just + * return + */ + if (msg->msg_no_resend) { + CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + /* check if the message has exceeded the number of retries */ + if (msg->msg_retry_count >= lnet_retry_count) { + CNETERR("msg %s->%s exceeded retry count %d\n", + libcfs_nid2str(msg->msg_from), + libcfs_nid2str(msg->msg_target.nid), + msg->msg_retry_count); + return -ENOTRECOVERABLE; + } + + cpt = msg->msg_tx_cpt; + lnet_net_lock(cpt); + + /* check again under lock */ + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { + lnet_net_unlock(cpt); + return -ESHUTDOWN; + } + + container = the_lnet.ln_msg_containers[cpt]; + my_slot = + lnet_check_finalize_recursion_locked(msg, + &container->msc_resending, + container->msc_nfinalizers, + container->msc_resenders); + + /* enough threads are resending */ + if (my_slot == -1) { + lnet_net_unlock(cpt); + return 0; + } + + while (!list_empty(&container->msc_resending)) { + msg = list_entry(container->msc_resending.next, + struct lnet_msg, msg_list); + list_del(&msg->msg_list); + + /* + * resending the message will require us to call + * lnet_msg_decommit_tx() which will return the credit + * which this message holds. This could trigger another + * queued message to be sent. If that message fails and + * requires a resend we will recurse. + * But since at this point the slot is taken, the message + * will be queued in the container and dealt with + * later. This breaks the recursion. + */ + lnet_resend_msg_locked(msg); + } + + /* + * msc_resenders is an array of process pointers. Each entry holds + * a pointer to the current process operating on the message. An + * array entry is created per CPT. If the array slot is already + * set, then it means that there is a thread on the CPT currently + * resending a message. + * Once the thread finishes clear the slot to enable the thread to + * take on more resend work. + */ + container->msc_resenders[my_slot] = NULL; + lnet_net_unlock(cpt); + + return 0; +} + /* * Do a health check on the message: * return -1 if we're not going to handle the error or @@ -606,21 +777,28 @@ static int lnet_health_check(struct lnet_msg *msg) { enum lnet_msg_hstatus hstatus = msg->msg_health_status; + struct lnet_peer_ni *lpni; + struct lnet_ni *ni; bool lo = false; + bool attempt_local_resend; + bool attempt_remote_resend; + bool handle_local_health; + bool handle_remote_health; /* if we're shutting down no point in handling health. */ - if (the_lnet.ln_state != LNET_STATE_RUNNING) + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) return -1; - LASSERT(msg->msg_txni); + LASSERT(msg->msg_tx_committed || msg->msg_rx_committed); /* * if we're sending to the LOLND then the msg_txpeer will not be * set. So no need to sanity check it. */ - if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND) + if (msg->msg_tx_committed && msg->msg_txni->ni_nid != LNET_NID_LO_0) LASSERT(msg->msg_txpeer); - else + else if (msg->msg_tx_committed && + msg->msg_txni->ni_nid == LNET_NID_LO_0) lo = true; if (hstatus != LNET_MSG_STATUS_OK && @@ -628,30 +806,104 @@ lnet_health_check(struct lnet_msg *msg) return -1; /* + * always prefer txni/txpeer if they message is committed for both + * directions. + */ + if (msg->msg_tx_committed) { + ni = msg->msg_txni; + lpni = msg->msg_txpeer; + attempt_local_resend = attempt_remote_resend = true; + } else { + ni = msg->msg_rxni; + lpni = msg->msg_rxpeer; + attempt_local_resend = attempt_remote_resend = false; + } + + /* Don't further decrement the health value if a recovery message + * failed. + */ + if (msg->msg_recovery) + handle_local_health = handle_remote_health = false; + else + handle_local_health = handle_remote_health = true; + + /* For local failures, health/recovery/resends are not needed if I only + * have a single (non-lolnd) interface. NB: pb_nnis includes the lolnd + * interface, so a single-rail node would have pb_nnis == 2. + */ + if (the_lnet.ln_ping_target->pb_nnis <= 2) { + handle_local_health = false; + attempt_local_resend = false; + } + + /* For remote failures, health/recovery/resends are not needed if the + * peer only has a single interface. Special case for routers where we + * rely on health feature to manage route aliveness. NB: unlike pb_nnis + * above, lp_nnis does _not_ include the lolnd, so a single-rail node + * would have lp_nnis == 1. + */ + if (lpni && lpni->lpni_peer_net->lpn_peer->lp_nnis <= 1) { + attempt_remote_resend = false; + if (!lnet_isrouter(lpni)) + handle_remote_health = false; + } + + if (!lo) + LASSERT(ni && lpni); + else + LASSERT(ni); + + CDEBUG(D_NET, "health check: %s->%s: %s: %s\n", + libcfs_nid2str(ni->ni_nid), + (lo) ? "self" : libcfs_nid2str(lpni->lpni_nid), + lnet_msgtyp2str(msg->msg_type), + lnet_health_error2str(hstatus)); + + /* * stats are only incremented for errors so avoid wasting time * incrementing statistics if there is no error. */ if (hstatus != LNET_MSG_STATUS_OK) { lnet_net_lock(0); - lnet_incr_hstats(msg, hstatus); + lnet_incr_hstats(ni, lpni, hstatus); lnet_net_unlock(0); } - CDEBUG(D_NET, "health check: %s->%s: %s: %s\n", - libcfs_nid2str(msg->msg_txni->ni_nid), - (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid), - lnet_msgtyp2str(msg->msg_type), - lnet_health_error2str(hstatus)); - switch (hstatus) { case LNET_MSG_STATUS_OK: - lnet_inc_healthv(&msg->msg_txni->ni_healthv); + /* + * increment the local ni health weather we successfully + * received or sent a message on it. + */ + lnet_inc_healthv(&ni->ni_healthv, lnet_health_sensitivity); /* * It's possible msg_txpeer is NULL in the LOLND - * case. + * case. Only increment the peer's health if we're + * receiving a message from it. It's the only sure way to + * know that a remote interface is up. + * If this interface is part of a router, then take that + * as indication that the router is fully healthy. */ - if (msg->msg_txpeer) - lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv); + if (lpni && msg->msg_rx_committed) { + /* + * If we're receiving a message from the router or + * I'm a router, then set that lpni's health to + * maximum so we can commence communication + */ + lnet_net_lock(0); + if (lnet_isrouter(lpni) || the_lnet.ln_routing) { + lnet_set_lpni_healthv_locked(lpni, + LNET_MAX_HEALTH_VALUE); + } else { + __u32 sensitivity = lpni->lpni_peer_net-> + lpn_peer->lp_health_sensitivity; + + lnet_inc_lpni_healthv_locked(lpni, + (sensitivity) ? sensitivity : + lnet_health_sensitivity); + } + lnet_net_unlock(0); + } /* we can finalize this message */ return -1; @@ -660,123 +912,98 @@ lnet_health_check(struct lnet_msg *msg) case LNET_MSG_STATUS_LOCAL_ABORTED: case LNET_MSG_STATUS_LOCAL_NO_ROUTE: case LNET_MSG_STATUS_LOCAL_TIMEOUT: - lnet_handle_local_failure(msg); - /* add to the re-send queue */ - goto resend; - - /* - * These errors will not trigger a resend so simply - * finalize the message - */ + if (handle_local_health) + lnet_handle_local_failure(ni); + if (attempt_local_resend) + return lnet_attempt_msg_resend(msg); + break; case LNET_MSG_STATUS_LOCAL_ERROR: - lnet_handle_local_failure(msg); + if (handle_local_health) + lnet_handle_local_failure(ni); return -1; - - /* - * TODO: since the remote dropped the message we can - * attempt a resend safely. - */ case LNET_MSG_STATUS_REMOTE_DROPPED: - lnet_handle_remote_failure(msg); - goto resend; - + if (handle_remote_health) + lnet_handle_remote_failure(lpni); + if (attempt_remote_resend) + return lnet_attempt_msg_resend(msg); + break; case LNET_MSG_STATUS_REMOTE_ERROR: case LNET_MSG_STATUS_REMOTE_TIMEOUT: case LNET_MSG_STATUS_NETWORK_TIMEOUT: - lnet_handle_remote_failure(msg); + if (handle_remote_health) + lnet_handle_remote_failure(lpni); return -1; default: LBUG(); } -resend: - /* don't resend recovery messages */ - if (msg->msg_recovery) - return -1; - - /* - * if we explicitly indicated we don't want to resend then just - * return - */ - if (msg->msg_no_resend) - return -1; + /* no resend is needed */ + return -1; +} - /* check if the message has exceeded the number of retries */ - if (msg->msg_retry_count >= lnet_retry_count) - return -1; - msg->msg_retry_count++; +static void +lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status) +{ + struct lnet_libmd *md = msg->msg_md; + int unlink; - lnet_net_lock(msg->msg_tx_cpt); + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT(md->md_refcount >= 0); - /* - * remove message from the active list and reset it in preparation - * for a resend. Two exception to this - * - * 1. the router case, whe a message is committed for rx when - * received, then tx when it is sent. When committed to both tx and - * rx we don't want to remove it from the active list. - * - * 2. The REPLY case since it uses the same msg block for the GET - * that was received. - */ - if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { - list_del_init(&msg->msg_activelist); - msg->msg_onactivelist = 0; + unlink = lnet_md_unlinkable(md); + if (md->md_handler) { + if ((md->md_flags & LNET_MD_FLAG_ABORTED) && !status) { + msg->msg_ev.status = -ETIMEDOUT; + CDEBUG(D_NET, "md 0x%p already unlinked\n", md); + } else { + msg->msg_ev.status = status; + } + msg->msg_ev.unlinked = unlink; + md->md_handler(&msg->msg_ev); } - /* - * The msg_target.nid which was originally set - * when calling LNetGet() or LNetPut() might've - * been overwritten if we're routing this message. - * Call lnet_return_tx_credits_locked() to return - * the credit this message consumed. The message will - * consume another credit when it gets resent. - */ - msg->msg_target.nid = msg->msg_hdr.dest_nid; - lnet_msg_decommit_tx(msg, -EAGAIN); - msg->msg_sending = 0; - msg->msg_receiving = 0; - msg->msg_target_is_router = 0; - - CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n", - libcfs_nid2str(msg->msg_hdr.src_nid), - libcfs_nid2str(msg->msg_hdr.dest_nid), - lnet_msgtyp2str(msg->msg_type), - lnet_health_error2str(hstatus)); - - list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); - lnet_net_unlock(msg->msg_tx_cpt); - wake_up(&the_lnet.ln_mt_waitq); - return 0; -} + if (unlink || (md->md_refcount == 0 && + md->md_threshold == LNET_MD_THRESH_INF)) + lnet_detach_rsp_tracker(md, cpt); -static void -lnet_detach_md(struct lnet_msg *msg, int status) -{ - int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + if (unlink) + lnet_md_unlink(md); - lnet_res_lock(cpt); - lnet_msg_detach_md(msg, status); - lnet_res_unlock(cpt); + msg->msg_md = NULL; } static bool lnet_is_health_check(struct lnet_msg *msg) { - bool hc; + bool hc = true; int status = msg->msg_ev.status; - /* - * perform a health check for any message committed for transmit - */ - hc = msg->msg_tx_committed; + if ((!msg->msg_tx_committed && !msg->msg_rx_committed) || + !msg->msg_onactivelist) { + CDEBUG(D_NET, "msg %p not committed for send or receive\n", + msg); + return false; + } + + if ((msg->msg_tx_committed && !msg->msg_txpeer) || + (msg->msg_rx_committed && !msg->msg_rxpeer)) { + /* The optimized GET case does not set msg_rxpeer, but status + * could be zero. Only print the error message if we have a + * non-zero status. + */ + if (status) + CDEBUG(D_NET, "msg %p status %d cannot retry\n", msg, + status); + return false; + } /* Check for status inconsistencies */ - if (hc && - ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) || - (status && msg->msg_health_status == LNET_MSG_STATUS_OK))) { - CERROR("Msg is in inconsistent state, don't perform health " - "checking (%d, %d)\n", status, msg->msg_health_status); + if ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) || + (status && msg->msg_health_status == LNET_MSG_STATUS_OK)) { + CDEBUG(D_NET, "Msg %p is in inconsistent state, don't perform health " + "checking (%d, %d)\n", msg, status, + msg->msg_health_status); hc = false; } @@ -817,6 +1044,32 @@ lnet_health_error2str(enum lnet_msg_hstatus hstatus) } } +bool +lnet_send_error_simulation(struct lnet_msg *msg, + enum lnet_msg_hstatus *hstatus) +{ + if (!msg) + return false; + + if (list_empty(&the_lnet.ln_drop_rules)) + return false; + + /* match only health rules */ + if (!lnet_drop_rule_match(&msg->msg_hdr, LNET_NID_ANY, + hstatus)) + return false; + + CDEBUG(D_NET, "src %s(%s)->dst %s: %s simulate health error: %s\n", + libcfs_nid2str(msg->msg_hdr.src_nid), + libcfs_nid2str(msg->msg_txni->ni_nid), + libcfs_nid2str(msg->msg_hdr.dest_nid), + lnet_msgtyp2str(msg->msg_type), + lnet_health_error2str(*hstatus)); + + return true; +} +EXPORT_SYMBOL(lnet_send_error_simulation); + void lnet_finalize(struct lnet_msg *msg, int status) { @@ -824,8 +1077,6 @@ lnet_finalize(struct lnet_msg *msg, int status) int my_slot; int cpt; int rc; - int i; - bool hc; LASSERT(!in_interrupt()); @@ -834,47 +1085,7 @@ lnet_finalize(struct lnet_msg *msg, int status) msg->msg_ev.status = status; - /* - * if this is an ACK or a REPLY then make sure to remove the - * response tracker. - */ - if (msg->msg_ev.type == LNET_EVENT_REPLY || - msg->msg_ev.type == LNET_EVENT_ACK) { - cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); - lnet_detach_rsp_tracker(msg->msg_md, cpt); - } - - /* if the message is successfully sent, no need to keep the MD around */ - if (msg->msg_md != NULL && !status) - lnet_detach_md(msg, status); - -again: - hc = lnet_is_health_check(msg); - - /* - * the MD would've been detached from the message if it was - * successfully sent. However, if it wasn't successfully sent the - * MD would be around. And since we recalculate whether to - * health check or not, it's possible that we change our minds and - * we don't want to health check this message. In this case also - * free the MD. - * - * If the message is successful we're going to - * go through the lnet_health_check() function, but that'll just - * increment the appropriate health value and return. - */ - if (msg->msg_md != NULL && !hc) - lnet_detach_md(msg, status); - - rc = 0; - if (!msg->msg_tx_committed && !msg->msg_rx_committed) { - /* not committed to network yet */ - LASSERT(!msg->msg_onactivelist); - lnet_msg_free(msg); - return; - } - - if (hc) { + if (lnet_is_health_check(msg)) { /* * Check the health status of the message. If it has one * of the errors that we're supposed to handle, and it has @@ -889,13 +1100,25 @@ again: */ if (!lnet_health_check(msg)) return; + } - /* - * if we get here then we need to clean up the md because we're - * finalizing the message. - */ - if (msg->msg_md != NULL) - lnet_detach_md(msg, status); + /* + * We're not going to resend this message so detach its MD and invoke + * the appropriate callbacks + */ + if (msg->msg_md != NULL) { + cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + lnet_res_lock(cpt); + lnet_msg_detach_md(msg, cpt, status); + lnet_res_unlock(cpt); + } + +again: + if (!msg->msg_tx_committed && !msg->msg_rx_committed) { + /* not committed to network yet */ + LASSERT(!msg->msg_onactivelist); + lnet_msg_free(msg); + return; } /* @@ -907,27 +1130,21 @@ again: lnet_net_lock(cpt); container = the_lnet.ln_msg_containers[cpt]; - list_add_tail(&msg->msg_list, &container->msc_finalizing); /* Recursion breaker. Don't complete the message here if I am (or * enough other threads are) already completing messages */ + my_slot = lnet_check_finalize_recursion_locked(msg, + &container->msc_finalizing, + container->msc_nfinalizers, + container->msc_finalizers); - my_slot = -1; - for (i = 0; i < container->msc_nfinalizers; i++) { - if (container->msc_finalizers[i] == current) - break; - - if (my_slot < 0 && container->msc_finalizers[i] == NULL) - my_slot = i; - } - - if (i < container->msc_nfinalizers || my_slot < 0) { + /* enough threads are resending */ + if (my_slot == -1) { lnet_net_unlock(cpt); return; } - container->msc_finalizers[my_slot] = current; - + rc = 0; while (!list_empty(&container->msc_finalizing)) { msg = list_entry(container->msc_finalizing.next, struct lnet_msg, msg_list); @@ -979,11 +1196,16 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) CERROR("%d active msg on exit\n", count); if (container->msc_finalizers != NULL) { - LIBCFS_FREE(container->msc_finalizers, - container->msc_nfinalizers * - sizeof(*container->msc_finalizers)); + CFS_FREE_PTR_ARRAY(container->msc_finalizers, + container->msc_nfinalizers); container->msc_finalizers = NULL; } + + if (container->msc_resenders != NULL) { + CFS_FREE_PTR_ARRAY(container->msc_resenders, + container->msc_nfinalizers); + container->msc_resenders = NULL; + } container->msc_init = 0; } @@ -996,6 +1218,7 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) INIT_LIST_HEAD(&container->msc_active); INIT_LIST_HEAD(&container->msc_finalizing); + INIT_LIST_HEAD(&container->msc_resending); /* number of CPUs */ container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); @@ -1012,6 +1235,16 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) return -ENOMEM; } + LIBCFS_CPT_ALLOC(container->msc_resenders, lnet_cpt_table(), cpt, + container->msc_nfinalizers * + sizeof(*container->msc_resenders)); + + if (container->msc_resenders == NULL) { + CERROR("Failed to allocate message resenders\n"); + lnet_msg_container_cleanup(container); + return -ENOMEM; + } + return rc; }