From: Amir Shehata Date: Tue, 5 Jun 2018 20:34:52 +0000 (-0700) Subject: LU-9120 lnet: handle local ni failure X-Git-Tag: 2.11.55~65^2^2~22 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=70616605dd44be37068f4e1a4745a2f8b90eb1f5 LU-9120 lnet: handle local ni failure Added an enumerated type listing the different errors which the LND can propagate up to LNet for further handling. All local timeout errors will trigger a resend if the system is configured for resends. Remote errors will not trigger a resend to avoid creating duplicate message scenario on the receiving end. If a transmit error is encountered where we're sure the message wasn't received by the remote end we will attempt a resend. LNet level logic to handle local NI failure. When the LND finalizes a message lnet_finalize() will check if the message completed successfully, if so it increments the healthv of the local NI, but not beyond the max, and if it failed then it'll decrement the healthv but not below 0 and put the message on the resend queue. On local NI failure the local NI is placed on a recovery queue. The monitor thread will wake up and resend all the messages pending. The selection algorithm will properly select the local and remote NIs based on the new healthv. The monitor thread will ping each NI on the local recovery queue. On reply it will check if the NIs healthv is back to maximum, if it is then it will remove it from the recovery queue, otherwise it'll keep it there until it's fully recovered. Test-Parameters: forbuildonly Signed-off-by: Amir Shehata Change-Id: I1cf5c6e74b9c5e5b06b15209f6ac77b49014e270 Reviewed-on: https://review.whamcloud.com/32764 Tested-by: Jenkins Reviewed-by: Sonia Sharma Reviewed-by: Olaf Weber --- diff --git a/lnet/include/lnet/api.h b/lnet/include/lnet/api.h index 9408583..77afdd3 100644 --- a/lnet/include/lnet/api.h +++ b/lnet/include/lnet/api.h @@ -198,7 +198,8 @@ int LNetGet(lnet_nid_t self, struct lnet_process_id target_in, unsigned int portal_in, __u64 match_bits_in, - unsigned int offset_in); + unsigned int offset_in, + bool recovery); /** @} lnet_data */ diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index f4868e0..9536840 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -604,6 +604,8 @@ void lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target, unsigned int offset, unsigned int len); int lnet_send(lnet_nid_t nid, struct lnet_msg *msg, lnet_nid_t rtr_nid); +int lnet_send_ping(lnet_nid_t dest_nid, struct lnet_handle_md *mdh, int nnis, + void *user_ptr, struct lnet_handle_eq eqh, bool recovery); void lnet_return_tx_credits_locked(struct lnet_msg *msg); void lnet_return_rx_credits_locked(struct lnet_msg *msg); void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp); @@ -691,6 +693,7 @@ void lnet_msg_container_cleanup(struct lnet_msg_container *container); void lnet_msg_containers_destroy(void); int lnet_msg_containers_create(void); +char *lnet_health_error2str(enum lnet_msg_hstatus hstatus); char *lnet_msgtyp2str(int type); void lnet_print_hdr(struct lnet_hdr *hdr); int lnet_fail_nid(lnet_nid_t nid, unsigned int threshold); diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 3817bcd..dee0a46 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -64,6 +64,20 @@ /* forward refs */ struct lnet_libmd; +enum lnet_msg_hstatus { + LNET_MSG_STATUS_OK = 0, + LNET_MSG_STATUS_LOCAL_INTERRUPT, + LNET_MSG_STATUS_LOCAL_DROPPED, + LNET_MSG_STATUS_LOCAL_ABORTED, + LNET_MSG_STATUS_LOCAL_NO_ROUTE, + LNET_MSG_STATUS_LOCAL_ERROR, + LNET_MSG_STATUS_LOCAL_TIMEOUT, + LNET_MSG_STATUS_REMOTE_ERROR, + LNET_MSG_STATUS_REMOTE_DROPPED, + LNET_MSG_STATUS_REMOTE_TIMEOUT, + LNET_MSG_STATUS_NETWORK_TIMEOUT +}; + struct lnet_msg { struct list_head msg_activelist; struct list_head msg_list; /* Q for credits/MD */ @@ -88,6 +102,13 @@ struct lnet_msg { */ ktime_t msg_deadline; + /* The message health status. */ + enum lnet_msg_hstatus msg_health_status; + /* This is a recovery message */ + bool msg_recovery; + /* flag to indicate that we do not want to resend this message */ + bool msg_no_resend; + /* committed for sending */ unsigned int msg_tx_committed:1; /* CPT # this message committed for sending */ @@ -287,18 +308,11 @@ enum lnet_net_state { LNET_NET_STATE_DELETING }; -enum lnet_ni_state { - /* set when NI block is allocated */ - LNET_NI_STATE_INIT = 0, - /* set when NI is started successfully */ - LNET_NI_STATE_ACTIVE, - /* set when LND notifies NI failed */ - LNET_NI_STATE_FAILED, - /* set when LND notifies NI degraded */ - LNET_NI_STATE_DEGRADED, - /* set when shuttding down NI */ - LNET_NI_STATE_DELETING -}; +#define LNET_NI_STATE_INIT (1 << 0) +#define LNET_NI_STATE_ACTIVE (1 << 1) +#define LNET_NI_STATE_FAILED (1 << 2) +#define LNET_NI_STATE_RECOVERY_PENDING (1 << 3) +#define LNET_NI_STATE_DELETING (1 << 4) enum lnet_stats_type { LNET_STATS_TYPE_SEND = 0, @@ -371,6 +385,12 @@ struct lnet_ni { /* chain on net_ni_cpt */ struct list_head ni_cptlist; + /* chain on the recovery queue */ + struct list_head ni_recovery; + + /* MD handle for recovery ping */ + struct lnet_handle_md ni_ping_mdh; + spinlock_t ni_lock; /* number of CPTs */ @@ -404,7 +424,7 @@ struct lnet_ni { struct lnet_ni_status *ni_status; /* NI FSM */ - enum lnet_ni_state ni_state; + __u32 ni_state; /* per NI LND tunables */ struct lnet_lnd_tunables ni_lnd_tunables; @@ -1046,6 +1066,14 @@ struct lnet { * checking routes, timedout messages and resending messages. */ wait_queue_head_t ln_mt_waitq; + + /* per-cpt resend queues */ + struct list_head **ln_mt_resendqs; + /* local NIs to recover */ + struct list_head ln_mt_localNIRecovq; + /* recovery eq handler */ + struct lnet_handle_eq ln_mt_eqh; + }; #endif diff --git a/lnet/lnet/api-ni.c b/lnet/lnet/api-ni.c index 80e0aeb..c1718ce 100644 --- a/lnet/lnet/api-ni.c +++ b/lnet/lnet/api-ni.c @@ -837,6 +837,7 @@ lnet_prepare(lnet_pid_t requested_pid) INIT_LIST_HEAD(&the_lnet.ln_dc_request); INIT_LIST_HEAD(&the_lnet.ln_dc_working); INIT_LIST_HEAD(&the_lnet.ln_dc_expired); + INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq); init_waitqueue_head(&the_lnet.ln_dc_waitq); rc = lnet_descriptor_setup(); @@ -1085,8 +1086,7 @@ lnet_islocalnet(__u32 net_id) bool lnet_is_ni_healthy_locked(struct lnet_ni *ni) { - if (ni->ni_state == LNET_NI_STATE_ACTIVE || - ni->ni_state == LNET_NI_STATE_DEGRADED) + if (ni->ni_state & LNET_NI_STATE_ACTIVE) return true; return false; @@ -1673,7 +1673,7 @@ lnet_clear_zombies_nis_locked(struct lnet_net *net) list_del_init(&ni->ni_netlist); /* the ni should be in deleting state. If it's not it's * a bug */ - LASSERT(ni->ni_state == LNET_NI_STATE_DELETING); + LASSERT(ni->ni_state & LNET_NI_STATE_DELETING); cfs_percpt_for_each(ref, j, ni->ni_refs) { if (*ref == 0) continue; @@ -1721,7 +1721,10 @@ lnet_shutdown_lndni(struct lnet_ni *ni) struct lnet_net *net = ni->ni_net; lnet_net_lock(LNET_LOCK_EX); - ni->ni_state = LNET_NI_STATE_DELETING; + lnet_ni_lock(ni); + ni->ni_state |= LNET_NI_STATE_DELETING; + ni->ni_state &= ~LNET_NI_STATE_ACTIVE; + lnet_ni_unlock(ni); lnet_ni_unlink_locked(ni); lnet_incr_dlc_seq(); lnet_net_unlock(LNET_LOCK_EX); @@ -1820,6 +1823,7 @@ lnet_shutdown_lndnets(void) list_for_each_entry_safe(msg, tmp, &resend, msg_list) { list_del_init(&msg->msg_list); + msg->msg_no_resend = true; lnet_finalize(msg, -ECANCELED); } @@ -1856,7 +1860,10 @@ lnet_startup_lndni(struct lnet_ni *ni, struct lnet_lnd_tunables *tun) goto failed0; } - ni->ni_state = LNET_NI_STATE_ACTIVE; + lnet_ni_lock(ni); + ni->ni_state |= LNET_NI_STATE_ACTIVE; + ni->ni_state &= ~LNET_NI_STATE_INIT; + lnet_ni_unlock(ni); /* We keep a reference on the loopback net through the loopback NI */ if (net->net_lnd->lnd_type == LOLND) { @@ -2579,10 +2586,17 @@ lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev) struct lnet_ni *ni; struct lnet_net *net = mynet; + /* + * It is possible that the net has been cleaned out while there is + * a message being sent. This function accessed the net without + * checking if the list is empty + */ if (prev == NULL) { if (net == NULL) net = list_entry(the_lnet.ln_nets.next, struct lnet_net, net_list); + if (list_empty(&net->net_ni_list)) + return NULL; ni = list_entry(net->net_ni_list.next, struct lnet_ni, ni_netlist); @@ -2604,6 +2618,8 @@ lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev) /* get the next net */ net = list_entry(prev->ni_net->net_list.next, struct lnet_net, net_list); + if (list_empty(&net->net_ni_list)) + return NULL; /* get the ni on it */ ni = list_entry(net->net_ni_list.next, struct lnet_ni, ni_netlist); @@ -2611,6 +2627,9 @@ lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev) return ni; } + if (list_empty(&prev->ni_netlist)) + return NULL; + /* there are more nis left */ ni = list_entry(prev->ni_netlist.next, struct lnet_ni, ni_netlist); @@ -3627,7 +3646,7 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout, rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL, - LNET_PROTO_PING_MATCHBITS, 0); + LNET_PROTO_PING_MATCHBITS, 0, false); if (rc != 0) { /* Don't CERROR; this could be deliberate! */ diff --git a/lnet/lnet/config.c b/lnet/lnet/config.c index 57f63fa..70240ad 100644 --- a/lnet/lnet/config.c +++ b/lnet/lnet/config.c @@ -458,6 +458,7 @@ lnet_ni_alloc_common(struct lnet_net *net, char *iface) spin_lock_init(&ni->ni_lock); INIT_LIST_HEAD(&ni->ni_cptlist); INIT_LIST_HEAD(&ni->ni_netlist); + INIT_LIST_HEAD(&ni->ni_recovery); ni->ni_refs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*ni->ni_refs[0])); if (ni->ni_refs == NULL) @@ -482,7 +483,7 @@ lnet_ni_alloc_common(struct lnet_net *net, char *iface) ni->ni_net_ns = NULL; ni->ni_last_alive = ktime_get_real_seconds(); - ni->ni_state = LNET_NI_STATE_INIT; + ni->ni_state |= LNET_NI_STATE_INIT; list_add_tail(&ni->ni_netlist, &net->net_ni_added); /* diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index c82c516..969cb07 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -764,8 +764,10 @@ lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg) (msg->msg_txcredit && msg->msg_peertxcredit)); rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg); - if (rc < 0) + if (rc < 0) { + msg->msg_no_resend = true; lnet_finalize(msg, rc); + } } static int @@ -946,8 +948,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send) CNETERR("Dropping message for %s: peer not alive\n", libcfs_id2str(msg->msg_target)); - if (do_send) + if (do_send) { + msg->msg_health_status = LNET_MSG_STATUS_LOCAL_DROPPED; lnet_finalize(msg, -EHOSTUNREACH); + } lnet_net_lock(cpt); return -EHOSTUNREACH; @@ -960,8 +964,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send) CNETERR("Aborting message for %s: LNetM[DE]Unlink() already " "called on the MD/ME.\n", libcfs_id2str(msg->msg_target)); - if (do_send) + if (do_send) { + msg->msg_no_resend = true; lnet_finalize(msg, -ECANCELED); + } lnet_net_lock(cpt); return -ECANCELED; @@ -1246,6 +1252,7 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt) lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL, 0, 0, 0, msg->msg_hdr.payload_length); list_del_init(&msg->msg_list); + msg->msg_no_resend = true; lnet_finalize(msg, -ECANCELED); } @@ -2505,6 +2512,15 @@ again: } /* + * Cache the original src_nid. If we need to resend the message + * then we'll need to know whether the src_nid was originally + * specified for this message. If it was originally specified, + * then we need to keep using the same src_nid since it's + * continuing the same sequence of messages. + */ + msg->msg_src_nid_param = src_nid; + + /* * Now that we have a peer_ni, check if we want to discover * the peer. Traffic to the LNET_RESERVED_PORTAL should not * trigger discovery. @@ -2521,7 +2537,6 @@ again: /* The peer may have changed. */ peer = lpni->lpni_peer_net->lpn_peer; /* queue message and return */ - msg->msg_src_nid_param = src_nid; msg->msg_rtr_nid_param = rtr_nid; msg->msg_sending = 0; list_add_tail(&msg->msg_list, &peer->lp_dc_pendq); @@ -2555,7 +2570,12 @@ again: else send_case |= REMOTE_DST; - if (!lnet_peer_is_multi_rail(peer)) + /* + * if this is a non-MR peer or if we're recovering a peer ni then + * let's consider this an NMR case so we can hit the destination + * NID. + */ + if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery) send_case |= NMR_DST; else send_case |= MR_DST; @@ -2602,10 +2622,11 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) * in the future */ /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */ - LASSERT (msg->msg_txpeer == NULL); - LASSERT (!msg->msg_sending); - LASSERT (!msg->msg_target_is_router); - LASSERT (!msg->msg_receiving); + LASSERT(msg->msg_txpeer == NULL); + LASSERT(msg->msg_txni == NULL); + LASSERT(!msg->msg_sending); + LASSERT(!msg->msg_target_is_router); + LASSERT(!msg->msg_receiving); msg->msg_sending = 1; @@ -2622,6 +2643,323 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) return 0; } +static void +lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt) +{ + struct lnet_msg *msg; + + while (!list_empty(resendq)) { + struct lnet_peer_ni *lpni; + + msg = list_entry(resendq->next, struct lnet_msg, + msg_list); + + list_del_init(&msg->msg_list); + + lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid); + if (!lpni) { + lnet_net_unlock(cpt); + CERROR("Expected that a peer is already created for %s\n", + libcfs_nid2str(msg->msg_hdr.dest_nid)); + msg->msg_no_resend = true; + lnet_finalize(msg, -EFAULT); + lnet_net_lock(cpt); + } else { + struct lnet_peer *peer; + int rc; + lnet_nid_t src_nid = LNET_NID_ANY; + + /* + * if this message is not being routed and the + * peer is non-MR then we must use the same + * src_nid that was used in the original send. + * Otherwise if we're routing the message (IE + * we're a router) then we can use any of our + * local interfaces. It doesn't matter to the + * final destination. + */ + peer = lpni->lpni_peer_net->lpn_peer; + if (!msg->msg_routing && + !lnet_peer_is_multi_rail(peer)) + src_nid = le64_to_cpu(msg->msg_hdr.src_nid); + + /* + * If we originally specified a src NID, then we + * must attempt to reuse it in the resend as well. + */ + if (msg->msg_src_nid_param != LNET_NID_ANY) + src_nid = msg->msg_src_nid_param; + lnet_peer_ni_decref_locked(lpni); + + lnet_net_unlock(cpt); + rc = lnet_send(src_nid, msg, LNET_NID_ANY); + if (rc) { + CERROR("Error sending %s to %s: %d\n", + lnet_msgtyp2str(msg->msg_type), + libcfs_id2str(msg->msg_target), rc); + msg->msg_no_resend = true; + lnet_finalize(msg, rc); + } + lnet_net_lock(cpt); + } + } +} + +static void +lnet_resend_pending_msgs(void) +{ + int i; + + cfs_cpt_for_each(i, lnet_cpt_table()) { + lnet_net_lock(i); + lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i); + lnet_net_unlock(i); + } +} + +/* called with cpt and ni_lock held */ +static void +lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt) +{ + struct lnet_handle_md recovery_mdh; + + LNetInvalidateMDHandle(&recovery_mdh); + + if (ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING) { + recovery_mdh = ni->ni_ping_mdh; + LNetInvalidateMDHandle(&ni->ni_ping_mdh); + } + lnet_ni_unlock(ni); + lnet_net_unlock(cpt); + if (!LNetMDHandleIsInvalid(recovery_mdh)) + LNetMDUnlink(recovery_mdh); + lnet_net_lock(cpt); + lnet_ni_lock(ni); +} + +static void +lnet_recover_local_nis(void) +{ + struct list_head processed_list; + struct list_head local_queue; + struct lnet_handle_md mdh; + struct lnet_ni *tmp; + struct lnet_ni *ni; + lnet_nid_t nid; + int healthv; + int rc; + + INIT_LIST_HEAD(&local_queue); + INIT_LIST_HEAD(&processed_list); + + /* + * splice the recovery queue on a local queue. We will iterate + * through the local queue and update it as needed. Once we're + * done with the traversal, we'll splice the local queue back on + * the head of the ln_mt_localNIRecovq. Any newly added local NIs + * will be traversed in the next iteration. + */ + lnet_net_lock(0); + list_splice_init(&the_lnet.ln_mt_localNIRecovq, + &local_queue); + lnet_net_unlock(0); + + list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) { + /* + * if an NI is being deleted or it is now healthy, there + * is no need to keep it around in the recovery queue. + * The monitor thread is the only thread responsible for + * removing the NI from the recovery queue. + * Multiple threads can be adding NIs to the recovery + * queue. + */ + healthv = atomic_read(&ni->ni_healthv); + + lnet_net_lock(0); + lnet_ni_lock(ni); + if (!(ni->ni_state & LNET_NI_STATE_ACTIVE) || + healthv == LNET_MAX_HEALTH_VALUE) { + list_del_init(&ni->ni_recovery); + lnet_unlink_ni_recovery_mdh_locked(ni, 0); + lnet_ni_unlock(ni); + lnet_ni_decref_locked(ni, 0); + lnet_net_unlock(0); + continue; + } + lnet_ni_unlock(ni); + lnet_net_unlock(0); + + /* + * protect the ni->ni_state field. Once we call the + * lnet_send_ping function it's possible we receive + * a response before we check the rc. The lock ensures + * a stable value for the ni_state RECOVERY_PENDING bit + */ + lnet_ni_lock(ni); + if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) { + ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING; + lnet_ni_unlock(ni); + mdh = ni->ni_ping_mdh; + /* + * Invalidate the ni mdh in case it's deleted. + * We'll unlink the mdh in this case below. + */ + LNetInvalidateMDHandle(&ni->ni_ping_mdh); + nid = ni->ni_nid; + + /* + * remove the NI from the local queue and drop the + * reference count to it while we're recovering + * it. The reason for that, is that the NI could + * be deleted, and the way the code is structured + * is if we don't drop the NI, then the deletion + * code will enter a loop waiting for the + * reference count to be removed while holding the + * ln_mutex_lock(). When we look up the peer to + * send to in lnet_select_pathway() we will try to + * lock the ln_mutex_lock() as well, leading to + * a deadlock. By dropping the refcount and + * removing it from the list, we allow for the NI + * to be removed, then we use the cached NID to + * look it up again. If it's gone, then we just + * continue examining the rest of the queue. + */ + lnet_net_lock(0); + list_del_init(&ni->ni_recovery); + lnet_ni_decref_locked(ni, 0); + lnet_net_unlock(0); + + rc = lnet_send_ping(nid, &mdh, + LNET_INTERFACES_MIN, (void *)nid, + the_lnet.ln_mt_eqh, true); + /* lookup the nid again */ + lnet_net_lock(0); + ni = lnet_nid2ni_locked(nid, 0); + if (!ni) { + /* + * the NI has been deleted when we dropped + * the ref count + */ + lnet_net_unlock(0); + LNetMDUnlink(mdh); + continue; + } + /* + * Same note as in lnet_recover_peer_nis(). When + * we're sending the ping, the NI is free to be + * deleted or manipulated. By this point it + * could've been added back on the recovery queue, + * and a refcount taken on it. + * So we can't just add it blindly again or we'll + * corrupt the queue. We must check under lock if + * it's not on any list and if not then add it + * to the processed list, which will eventually be + * spliced back on to the recovery queue. + */ + ni->ni_ping_mdh = mdh; + if (list_empty(&ni->ni_recovery)) { + list_add_tail(&ni->ni_recovery, &processed_list); + lnet_ni_addref_locked(ni, 0); + } + lnet_net_unlock(0); + + lnet_ni_lock(ni); + if (rc) + ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING; + } + lnet_ni_unlock(ni); + } + + /* + * put back the remaining NIs on the ln_mt_localNIRecovq to be + * reexamined in the next iteration. + */ + list_splice_init(&processed_list, &local_queue); + lnet_net_lock(0); + list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq); + lnet_net_unlock(0); +} + +static struct list_head ** +lnet_create_array_of_queues(void) +{ + struct list_head **qs; + struct list_head *q; + int i; + + qs = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(struct list_head)); + if (!qs) { + CERROR("Failed to allocate queues\n"); + return NULL; + } + + cfs_percpt_for_each(q, i, qs) + INIT_LIST_HEAD(q); + + return qs; +} + +static int +lnet_resendqs_create(void) +{ + struct list_head **resendqs; + resendqs = lnet_create_array_of_queues(); + + if (!resendqs) + return -ENOMEM; + + lnet_net_lock(LNET_LOCK_EX); + the_lnet.ln_mt_resendqs = resendqs; + lnet_net_unlock(LNET_LOCK_EX); + + return 0; +} + +static void +lnet_clean_local_ni_recoveryq(void) +{ + struct lnet_ni *ni; + + /* This is only called when the monitor thread has stopped */ + lnet_net_lock(0); + + while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) { + ni = list_entry(the_lnet.ln_mt_localNIRecovq.next, + struct lnet_ni, ni_recovery); + list_del_init(&ni->ni_recovery); + lnet_ni_lock(ni); + lnet_unlink_ni_recovery_mdh_locked(ni, 0); + lnet_ni_unlock(ni); + lnet_ni_decref_locked(ni, 0); + } + + lnet_net_unlock(0); +} + +static void +lnet_clean_resendqs(void) +{ + struct lnet_msg *msg, *tmp; + struct list_head msgs; + int i; + + INIT_LIST_HEAD(&msgs); + + cfs_cpt_for_each(i, lnet_cpt_table()) { + lnet_net_lock(i); + list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs); + lnet_net_unlock(i); + list_for_each_entry_safe(msg, tmp, &msgs, msg_list) { + list_del_init(&msg->msg_list); + msg->msg_no_resend = true; + lnet_finalize(msg, -ESHUTDOWN); + } + } + + cfs_percpt_free(the_lnet.ln_mt_resendqs); +} + static int lnet_monitor_thread(void *arg) { @@ -2641,6 +2979,10 @@ lnet_monitor_thread(void *arg) if (lnet_router_checker_active()) lnet_check_routers(); + lnet_resend_pending_msgs(); + + lnet_recover_local_nis(); + /* * TODO do we need to check if we should sleep without * timeout? Technically, an active system will always @@ -2667,42 +3009,183 @@ lnet_monitor_thread(void *arg) return 0; } -int lnet_monitor_thr_start(void) +/* + * lnet_send_ping + * Sends a ping. + * Returns == 0 if success + * Returns > 0 if LNetMDBind or prior fails + * Returns < 0 if LNetGet fails + */ +int +lnet_send_ping(lnet_nid_t dest_nid, + struct lnet_handle_md *mdh, int nnis, + void *user_data, struct lnet_handle_eq eqh, bool recovery) { + struct lnet_md md = { NULL }; + struct lnet_process_id id; + struct lnet_ping_buffer *pbuf; int rc; + + if (dest_nid == LNET_NID_ANY) { + rc = -EHOSTUNREACH; + goto fail_error; + } + + pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS); + if (!pbuf) { + rc = ENOMEM; + goto fail_error; + } + + /* initialize md content */ + md.start = &pbuf->pb_info; + md.length = LNET_PING_INFO_SIZE(nnis); + md.threshold = 2; /* GET/REPLY */ + md.max_size = 0; + md.options = LNET_MD_TRUNCATE; + md.user_ptr = user_data; + md.eq_handle = eqh; + + rc = LNetMDBind(md, LNET_UNLINK, mdh); + if (rc) { + lnet_ping_buffer_decref(pbuf); + CERROR("Can't bind MD: %d\n", rc); + rc = -rc; /* change the rc to positive */ + goto fail_error; + } + id.pid = LNET_PID_LUSTRE; + id.nid = dest_nid; + + rc = LNetGet(LNET_NID_ANY, *mdh, id, + LNET_RESERVED_PORTAL, + LNET_PROTO_PING_MATCHBITS, 0, recovery); + + if (rc) + goto fail_unlink_md; + + return 0; + +fail_unlink_md: + LNetMDUnlink(*mdh); + LNetInvalidateMDHandle(mdh); +fail_error: + return rc; +} + +static void +lnet_mt_event_handler(struct lnet_event *event) +{ + lnet_nid_t nid = (lnet_nid_t) event->md.user_ptr; + struct lnet_ni *ni; + struct lnet_ping_buffer *pbuf; + + /* TODO: remove assert */ + LASSERT(event->type == LNET_EVENT_REPLY || + event->type == LNET_EVENT_SEND || + event->type == LNET_EVENT_UNLINK); + + CDEBUG(D_NET, "Received event: %d status: %d\n", event->type, + event->status); + + switch (event->type) { + case LNET_EVENT_REPLY: + /* + * If the NI has been restored completely then remove from + * the recovery queue + */ + lnet_net_lock(0); + ni = lnet_nid2ni_locked(nid, 0); + if (!ni) { + lnet_net_unlock(0); + break; + } + lnet_ni_lock(ni); + ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING; + lnet_ni_unlock(ni); + lnet_net_unlock(0); + break; + case LNET_EVENT_SEND: + CDEBUG(D_NET, "%s recovery message sent %s:%d\n", + libcfs_nid2str(nid), + (event->status) ? "unsuccessfully" : + "successfully", event->status); + break; + case LNET_EVENT_UNLINK: + /* nothing to do */ + CDEBUG(D_NET, "%s recovery ping unlinked\n", + libcfs_nid2str(nid)); + break; + default: + CERROR("Unexpected event: %d\n", event->type); + return; + } + if (event->unlinked) { + pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start); + lnet_ping_buffer_decref(pbuf); + } +} + +int lnet_monitor_thr_start(void) +{ + int rc = 0; struct task_struct *task; - LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN); + if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN) + return -EALREADY; - sema_init(&the_lnet.ln_mt_signal, 0); + rc = lnet_resendqs_create(); + if (rc) + return rc; + + rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh); + if (rc != 0) { + CERROR("Can't allocate monitor thread EQ: %d\n", rc); + goto clean_queues; + } /* Pre monitor thread start processing */ rc = lnet_router_pre_mt_start(); - if (!rc) - return rc; + if (rc) + goto free_mem; + + sema_init(&the_lnet.ln_mt_signal, 0); the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING; task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread"); if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("Can't start monitor thread: %d\n", rc); - /* block until event callback signals exit */ - down(&the_lnet.ln_mt_signal); - - /* clean up */ - lnet_router_cleanup(); - the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; - return -ENOMEM; + goto clean_thread; } /* post monitor thread start processing */ lnet_router_post_mt_start(); return 0; + +clean_thread: + the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING; + /* block until event callback signals exit */ + down(&the_lnet.ln_mt_signal); + /* clean up */ + lnet_router_cleanup(); +free_mem: + the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; + lnet_clean_resendqs(); + lnet_clean_local_ni_recoveryq(); + LNetEQFree(the_lnet.ln_mt_eqh); + LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh); + return rc; +clean_queues: + lnet_clean_resendqs(); + lnet_clean_local_ni_recoveryq(); + return rc; } void lnet_monitor_thr_stop(void) { + int rc; + if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) return; @@ -2716,7 +3199,12 @@ void lnet_monitor_thr_stop(void) down(&the_lnet.ln_mt_signal); LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN); + /* perform cleanup tasks */ lnet_router_cleanup(); + lnet_clean_resendqs(); + lnet_clean_local_ni_recoveryq(); + rc = LNetEQFree(the_lnet.ln_mt_eqh); + LASSERT(rc == 0); return; } @@ -3420,6 +3908,8 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason) lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt, msg->msg_private, msg->msg_len, msg->msg_type); + + msg->msg_no_resend = true; /* * NB: message will not generate event because w/o attached MD, * but we still should give error code so lnet_msg_decommit() @@ -3583,6 +4073,7 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, if (rc != 0) { CNETERR("Error sending PUT to %s: %d\n", libcfs_id2str(target), rc); + msg->msg_no_resend = true; lnet_finalize(msg, rc); } @@ -3712,7 +4203,7 @@ EXPORT_SYMBOL(lnet_set_reply_msg_len); int LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, struct lnet_process_id target, unsigned int portal, - __u64 match_bits, unsigned int offset) + __u64 match_bits, unsigned int offset, bool recovery) { struct lnet_msg *msg; struct lnet_libmd *md; @@ -3736,6 +4227,8 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, return -ENOMEM; } + msg->msg_recovery = recovery; + cpt = lnet_cpt_of_cookie(mdh.cookie); lnet_res_lock(cpt); @@ -3779,6 +4272,7 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, if (rc < 0) { CNETERR("Error sending GET to %s: %d\n", libcfs_id2str(target), rc); + msg->msg_no_resend = true; lnet_finalize(msg, rc); } diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 2393ed5..12a8c0c 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -461,14 +461,252 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt) return 0; } +static void +lnet_dec_healthv_locked(atomic_t *healthv) +{ + int h = atomic_read(healthv); + + if (h < lnet_health_sensitivity) { + atomic_set(healthv, 0); + } else { + h -= lnet_health_sensitivity; + atomic_set(healthv, h); + } +} + +static inline void +lnet_inc_healthv(atomic_t *healthv) +{ + atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE); +} + +static void +lnet_handle_local_failure(struct lnet_msg *msg) +{ + struct lnet_ni *local_ni; + + local_ni = msg->msg_txni; + + /* + * the lnet_net_lock(0) is used to protect the addref on the ni + * and the recovery queue. + */ + lnet_net_lock(0); + /* the mt could've shutdown and cleaned up the queues */ + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { + lnet_net_unlock(0); + return; + } + + lnet_dec_healthv_locked(&local_ni->ni_healthv); + /* + * add the NI to the recovery queue if it's not already there + * and it's health value is actually below the maximum. It's + * possible that the sensitivity might be set to 0, and the health + * value will not be reduced. In this case, there is no reason to + * invoke recovery + */ + if (list_empty(&local_ni->ni_recovery) && + atomic_read(&local_ni->ni_healthv) < LNET_MAX_HEALTH_VALUE) { + CERROR("ni %s added to recovery queue. Health = %d\n", + libcfs_nid2str(local_ni->ni_nid), + atomic_read(&local_ni->ni_healthv)); + list_add_tail(&local_ni->ni_recovery, + &the_lnet.ln_mt_localNIRecovq); + lnet_ni_addref_locked(local_ni, 0); + } + lnet_net_unlock(0); +} + +/* + * Do a health check on the message: + * return -1 if we're not going to handle the error + * success case will return -1 as well + * return 0 if it the message is requeued for send + */ +static int +lnet_health_check(struct lnet_msg *msg) +{ + enum lnet_msg_hstatus hstatus = msg->msg_health_status; + + /* TODO: lnet_incr_hstats(hstatus); */ + + LASSERT(msg->msg_txni); + + if (hstatus != LNET_MSG_STATUS_OK && + ktime_compare(ktime_get(), msg->msg_deadline) >= 0) + return -1; + + /* if we're shutting down no point in handling health. */ + if (the_lnet.ln_state != LNET_STATE_RUNNING) + return -1; + + switch (hstatus) { + case LNET_MSG_STATUS_OK: + lnet_inc_healthv(&msg->msg_txni->ni_healthv); + /* we can finalize this message */ + return -1; + case LNET_MSG_STATUS_LOCAL_INTERRUPT: + case LNET_MSG_STATUS_LOCAL_DROPPED: + case LNET_MSG_STATUS_LOCAL_ABORTED: + case LNET_MSG_STATUS_LOCAL_NO_ROUTE: + case LNET_MSG_STATUS_LOCAL_TIMEOUT: + lnet_handle_local_failure(msg); + /* add to the re-send queue */ + goto resend; + + /* + * TODO: since the remote dropped the message we can + * attempt a resend safely. + */ + case LNET_MSG_STATUS_REMOTE_DROPPED: + break; + + /* + * These errors will not trigger a resend so simply + * finalize the message + */ + case LNET_MSG_STATUS_LOCAL_ERROR: + lnet_handle_local_failure(msg); + return -1; + case LNET_MSG_STATUS_REMOTE_ERROR: + case LNET_MSG_STATUS_REMOTE_TIMEOUT: + case LNET_MSG_STATUS_NETWORK_TIMEOUT: + return -1; + } + +resend: + /* don't resend recovery messages */ + if (msg->msg_recovery) + return -1; + + /* + * if we explicitly indicated we don't want to resend then just + * return + */ + if (msg->msg_no_resend) + return -1; + + lnet_net_lock(msg->msg_tx_cpt); + + /* + * remove message from the active list and reset it in preparation + * for a resend. Two exception to this + * + * 1. the router case, whe a message is committed for rx when + * received, then tx when it is sent. When committed to both tx and + * rx we don't want to remove it from the active list. + * + * 2. The REPLY case since it uses the same msg block for the GET + * that was received. + */ + if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) { + list_del_init(&msg->msg_activelist); + msg->msg_onactivelist = 0; + } + /* + * The msg_target.nid which was originally set + * when calling LNetGet() or LNetPut() might've + * been overwritten if we're routing this message. + * Call lnet_return_tx_credits_locked() to return + * the credit this message consumed. The message will + * consume another credit when it gets resent. + */ + msg->msg_target.nid = msg->msg_hdr.dest_nid; + lnet_msg_decommit_tx(msg, -EAGAIN); + msg->msg_sending = 0; + msg->msg_receiving = 0; + msg->msg_target_is_router = 0; + + CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n", + libcfs_nid2str(msg->msg_hdr.src_nid), + libcfs_nid2str(msg->msg_hdr.dest_nid), + lnet_msgtyp2str(msg->msg_type), + lnet_health_error2str(hstatus)); + + list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]); + lnet_net_unlock(msg->msg_tx_cpt); + + wake_up(&the_lnet.ln_mt_waitq); + return 0; +} + +static void +lnet_detach_md(struct lnet_msg *msg, int status) +{ + int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + + lnet_res_lock(cpt); + lnet_msg_detach_md(msg, status); + lnet_res_unlock(cpt); +} + +static bool +lnet_is_health_check(struct lnet_msg *msg) +{ + bool hc; + int status = msg->msg_ev.status; + + /* + * perform a health check for any message committed for transmit + */ + hc = msg->msg_tx_committed; + + /* Check for status inconsistencies */ + if (hc && + ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) || + (status && msg->msg_health_status == LNET_MSG_STATUS_OK))) { + CERROR("Msg is in inconsistent state, don't perform health " + "checking (%d, %d)\n", status, msg->msg_health_status); + hc = false; + } + + CDEBUG(D_NET, "health check = %d, status = %d, hstatus = %d\n", + hc, status, msg->msg_health_status); + + return hc; +} + +char * +lnet_health_error2str(enum lnet_msg_hstatus hstatus) +{ + switch (hstatus) { + case LNET_MSG_STATUS_LOCAL_INTERRUPT: + return "LOCAL_INTERRUPT"; + case LNET_MSG_STATUS_LOCAL_DROPPED: + return "LOCAL_DROPPED"; + case LNET_MSG_STATUS_LOCAL_ABORTED: + return "LOCAL_ABORTED"; + case LNET_MSG_STATUS_LOCAL_NO_ROUTE: + return "LOCAL_NO_ROUTE"; + case LNET_MSG_STATUS_LOCAL_TIMEOUT: + return "LOCAL_TIMEOUT"; + case LNET_MSG_STATUS_LOCAL_ERROR: + return "LOCAL_ERROR"; + case LNET_MSG_STATUS_REMOTE_DROPPED: + return "REMOTE_DROPPED"; + case LNET_MSG_STATUS_REMOTE_ERROR: + return "REMOTE_ERROR"; + case LNET_MSG_STATUS_REMOTE_TIMEOUT: + return "REMOTE_TIMEOUT"; + case LNET_MSG_STATUS_NETWORK_TIMEOUT: + return "NETWORK_TIMEOUT"; + case LNET_MSG_STATUS_OK: + return "OK"; + default: + return ""; + } +} + void lnet_finalize(struct lnet_msg *msg, int status) { - struct lnet_msg_container *container; - int my_slot; - int cpt; - int rc; - int i; + struct lnet_msg_container *container; + int my_slot; + int cpt; + int rc; + int i; + bool hc; LASSERT(!in_interrupt()); @@ -477,15 +715,28 @@ lnet_finalize(struct lnet_msg *msg, int status) msg->msg_ev.status = status; - if (msg->msg_md != NULL) { - cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + /* if the message is successfully sent, no need to keep the MD around */ + if (msg->msg_md != NULL && !status) + lnet_detach_md(msg, status); - lnet_res_lock(cpt); - lnet_msg_detach_md(msg, status); - lnet_res_unlock(cpt); - } +again: + hc = lnet_is_health_check(msg); + + /* + * the MD would've been detached from the message if it was + * successfully sent. However, if it wasn't successfully sent the + * MD would be around. And since we recalculate whether to + * health check or not, it's possible that we change our minds and + * we don't want to health check this message. In this case also + * free the MD. + * + * If the message is successful we're going to + * go through the lnet_health_check() function, but that'll just + * increment the appropriate health value and return. + */ + if (msg->msg_md != NULL && !hc) + lnet_detach_md(msg, status); - again: rc = 0; if (!msg->msg_tx_committed && !msg->msg_rx_committed) { /* not committed to network yet */ @@ -494,6 +745,30 @@ lnet_finalize(struct lnet_msg *msg, int status) return; } + if (hc) { + /* + * Check the health status of the message. If it has one + * of the errors that we're supposed to handle, and it has + * not timed out, then + * 1. Decrement the appropriate health_value + * 2. queue the message on the resend queue + + * if the message send is success, timed out or failed in the + * health check for any reason then we'll just finalize the + * message. Otherwise just return since the message has been + * put on the resend queue. + */ + if (!lnet_health_check(msg)) + return; + + /* + * if we get here then we need to clean up the md because we're + * finalizing the message. + */ + if (msg->msg_md != NULL) + lnet_detach_md(msg, status); + } + /* * NB: routed message can be committed for both receiving and sending, * we should finalize in LIFO order and keep counters correct. @@ -528,7 +803,7 @@ lnet_finalize(struct lnet_msg *msg, int status) msg = list_entry(container->msc_finalizing.next, struct lnet_msg, msg_list); - list_del(&msg->msg_list); + list_del_init(&msg->msg_list); /* NB drops and regains the lnet lock if it actually does * anything, so my finalizing friends can chomp along too */ @@ -566,7 +841,7 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) struct lnet_msg, msg_activelist); LASSERT(msg->msg_onactivelist); msg->msg_onactivelist = 0; - list_del(&msg->msg_activelist); + list_del_init(&msg->msg_activelist); lnet_msg_free(msg); count++; } diff --git a/lnet/lnet/peer.c b/lnet/lnet/peer.c index f2b0819..80dd177 100644 --- a/lnet/lnet/peer.c +++ b/lnet/lnet/peer.c @@ -2719,9 +2719,7 @@ static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp) static int lnet_peer_send_ping(struct lnet_peer *lp) __must_hold(&lp->lp_lock) { - struct lnet_md md = { NULL }; - struct lnet_process_id id; - struct lnet_ping_buffer *pbuf; + lnet_nid_t pnid; int nnis; int rc; int cpt; @@ -2730,55 +2728,37 @@ __must_hold(&lp->lp_lock) lp->lp_state &= ~LNET_PEER_FORCE_PING; spin_unlock(&lp->lp_lock); - nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN); - pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS); - if (!pbuf) { - rc = -ENOMEM; - goto fail_error; - } - - /* initialize md content */ - md.start = &pbuf->pb_info; - md.length = LNET_PING_INFO_SIZE(nnis); - md.threshold = 2; /* GET/REPLY */ - md.max_size = 0; - md.options = LNET_MD_TRUNCATE; - md.user_ptr = lp; - md.eq_handle = the_lnet.ln_dc_eqh; - - rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh); - if (rc != 0) { - lnet_ping_buffer_decref(pbuf); - CERROR("Can't bind MD: %d\n", rc); - goto fail_error; - } cpt = lnet_net_lock_current(); /* Refcount for MD. */ lnet_peer_addref_locked(lp); - id.pid = LNET_PID_LUSTRE; - id.nid = lnet_peer_select_nid(lp); + pnid = lnet_peer_select_nid(lp); lnet_net_unlock(cpt); - if (id.nid == LNET_NID_ANY) { - rc = -EHOSTUNREACH; - goto fail_unlink_md; - } + nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN); - rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id, - LNET_RESERVED_PORTAL, - LNET_PROTO_PING_MATCHBITS, 0); + rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp, + the_lnet.ln_dc_eqh, false); - if (rc) - goto fail_unlink_md; + /* + * if LNetMDBind in lnet_send_ping fails we need to decrement the + * refcount on the peer, otherwise LNetMDUnlink will be called + * which will eventually do that. + */ + if (rc > 0) { + lnet_net_lock(cpt); + lnet_peer_decref_locked(lp); + lnet_net_unlock(cpt); + rc = -rc; /* change the rc to negative value */ + goto fail_error; + } else if (rc < 0) { + goto fail_error; + } CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); spin_lock(&lp->lp_lock); return 0; -fail_unlink_md: - LNetMDUnlink(lp->lp_ping_mdh); - LNetInvalidateMDHandle(&lp->lp_ping_mdh); fail_error: CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); /* diff --git a/lnet/lnet/router.c b/lnet/lnet/router.c index c682ddf..14d3cae 100644 --- a/lnet/lnet/router.c +++ b/lnet/lnet/router.c @@ -1107,7 +1107,7 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr) lnet_net_unlock(rtr->lpni_cpt); rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL, - LNET_PROTO_PING_MATCHBITS, 0); + LNET_PROTO_PING_MATCHBITS, 0, false); lnet_net_lock(rtr->lpni_cpt); if (rc != 0) diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 4e1c504..17e7e60 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -422,7 +422,7 @@ srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, } else { LASSERT ((options & LNET_MD_OP_GET) != 0); - rc = LNetGet(self, *mdh, peer, portal, matchbits, 0); + rc = LNetGet(self, *mdh, peer, portal, matchbits, 0, false); } if (rc != 0) { diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 29d10e3..fb6a282 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -231,7 +231,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) desc->bd_portal, mbits, 0, 0); else rc = LNetGet(self_nid, desc->bd_mds[posted_md], - peer_id, desc->bd_portal, mbits, 0); + peer_id, desc->bd_portal, mbits, 0, false); posted_md++; if (rc != 0) {