lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg)
{
void *priv = msg->msg_private;
- int rc;
+ int rc;
LASSERT (!in_interrupt ());
LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
(msg->msg_txcredit && msg->msg_peertxcredit));
rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
- if (rc < 0)
+ if (rc < 0) {
+ msg->msg_no_resend = true;
lnet_finalize(msg, rc);
+ }
}
static int
/* NB: returns 1 when alive, 0 when dead, negative when error;
* may drop the lnet_net_lock */
static int
-lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
+lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp,
+ struct lnet_msg *msg)
{
time64_t now = ktime_get_seconds();
return 1;
/*
+ * If we're resending a message, let's attempt to send it even if
+ * the peer is down to fulfill our resend quota on the message
+ */
+ if (msg->msg_retry_count > 0)
+ return 1;
+
+ /*
* Peer appears dead, but we should avoid frequent NI queries (at
* most once per lnet_queryinterval seconds).
*/
/* NB 'lp' is always the next hop */
if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
- lnet_peer_alive_locked(ni, lp) == 0) {
- the_lnet.ln_counters[cpt]->drop_count++;
- the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
+ lnet_peer_alive_locked(ni, lp, msg) == 0) {
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
+ msg->msg_len;
lnet_net_unlock(cpt);
if (msg->msg_txpeer)
lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
CNETERR("Dropping message for %s: peer not alive\n",
libcfs_id2str(msg->msg_target));
+ msg->msg_health_status = LNET_MSG_STATUS_LOCAL_DROPPED;
if (do_send)
lnet_finalize(msg, -EHOSTUNREACH);
CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
"called on the MD/ME.\n",
libcfs_id2str(msg->msg_target));
- if (do_send)
+ if (do_send) {
+ msg->msg_no_resend = true;
lnet_finalize(msg, -ECANCELED);
+ }
lnet_net_lock(cpt);
return -ECANCELED;
}
}
+ /* unset the tx_delay flag as we're going to send it now */
+ msg->msg_tx_delayed = 0;
+
if (do_send) {
lnet_net_unlock(cpt);
lnet_ni_send(ni, msg);
msg->msg_niov = rbp->rbp_npages;
msg->msg_kiov = &rb->rb_kiov[0];
+ /* unset the msg-rx_delayed flag since we're receiving the message */
+ msg->msg_rx_delayed = 0;
+
if (do_recv) {
int cpt = msg->msg_rx_cpt;
}
if (txpeer != NULL) {
- /*
- * TODO:
- * Once the patch for the health comes in we need to set
- * the health of the peer ni to bad when we fail to send
- * a message.
- * int status = msg->msg_ev.status;
- * if (status != 0)
- * lnet_set_peer_ni_health_locked(txpeer, false)
- */
msg->msg_txpeer = NULL;
lnet_peer_ni_decref_locked(txpeer);
}
lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
0, 0, 0, msg->msg_hdr.payload_length);
list_del_init(&msg->msg_list);
+ msg->msg_no_resend = true;
lnet_finalize(msg, -ECANCELED);
}
struct lnet_ni *ni = NULL;
unsigned int shortest_distance;
int best_credits;
+ int best_healthv;
/*
* If there is no peer_ni that we can send to on this network,
if (best_ni == NULL) {
shortest_distance = UINT_MAX;
best_credits = INT_MIN;
+ best_healthv = 0;
} else {
shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt,
best_ni->ni_dev_cpt);
best_credits = atomic_read(&best_ni->ni_tx_credits);
+ best_healthv = atomic_read(&best_ni->ni_healthv);
}
while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
unsigned int distance;
int ni_credits;
-
- if (!lnet_is_ni_healthy_locked(ni))
- continue;
+ int ni_healthv;
+ int ni_fatal;
ni_credits = atomic_read(&ni->ni_tx_credits);
+ ni_healthv = atomic_read(&ni->ni_healthv);
+ ni_fatal = atomic_read(&ni->ni_fatal_error_on);
/*
* calculate the distance from the CPT on which
distance = lnet_numa_range;
/*
- * Select on shorter distance, then available
+ * Select on health, shorter distance, available
* credits, then round-robin.
*/
- if (distance > shortest_distance) {
+ if (ni_fatal) {
+ continue;
+ } else if (ni_healthv < best_healthv) {
+ continue;
+ } else if (ni_healthv > best_healthv) {
+ best_healthv = ni_healthv;
+ /*
+ * If we're going to prefer this ni because it's
+ * the healthiest, then we should set the
+ * shortest_distance in the algorithm in case
+ * there are multiple NIs with the same health but
+ * different distances.
+ */
+ if (distance < shortest_distance)
+ shortest_distance = distance;
+ } else if (distance > shortest_distance) {
continue;
} else if (distance < shortest_distance) {
shortest_distance = distance;
} else if (ni_credits < best_credits) {
continue;
} else if (ni_credits == best_credits) {
- if (best_ni && (best_ni)->ni_seq <= ni->ni_seq)
+ if (best_ni && best_ni->ni_seq <= ni->ni_seq)
continue;
}
best_ni = ni;
__u32 send_case = sd->sd_send_case;
int rc;
__u32 routing = send_case & REMOTE_DST;
+ struct lnet_rsp_tracker *rspt;
/*
* Increment sequence number of the selected peer so that we
msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
}
+ /*
+ * if we have response tracker block update it with the next hop
+ * nid
+ */
+ if (msg->msg_md) {
+ rspt = msg->msg_md->md_rspt_ptr;
+ if (rspt) {
+ rspt->rspt_next_hop_nid = msg->msg_txpeer->lpni_nid;
+ CDEBUG(D_NET, "rspt_next_hop_nid = %s\n",
+ libcfs_nid2str(rspt->rspt_next_hop_nid));
+ }
+ }
+
rc = lnet_post_send_locked(msg, 0);
if (!rc)
- CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s\n",
+ CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s try# %d\n",
libcfs_nid2str(msg->msg_hdr.src_nid),
libcfs_nid2str(msg->msg_txni->ni_nid),
libcfs_nid2str(sd->sd_src_nid),
libcfs_nid2str(msg->msg_hdr.dest_nid),
libcfs_nid2str(sd->sd_dst_nid),
libcfs_nid2str(msg->msg_txpeer->lpni_nid),
- lnet_msgtyp2str(msg->msg_type));
+ lnet_msgtyp2str(msg->msg_type), msg->msg_retry_count);
return rc;
}
int best_lpni_credits = INT_MIN;
bool preferred = false;
bool ni_is_pref;
+ int best_lpni_healthv = 0;
+ int lpni_healthv;
while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
/*
ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
best_ni->ni_nid);
+ lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
CDEBUG(D_NET, "%s ni_is_pref = %d\n",
libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
lpni->lpni_txcredits, best_lpni_credits,
lpni->lpni_seq, best_lpni->lpni_seq);
+ /* pick the healthiest peer ni */
+ if (lpni_healthv < best_lpni_healthv) {
+ continue;
+ } else if (lpni_healthv > best_lpni_healthv) {
+ best_lpni_healthv = lpni_healthv;
/* if this is a preferred peer use it */
- if (!preferred && ni_is_pref) {
+ } else if (!preferred && ni_is_pref) {
preferred = true;
} else if (preferred && !ni_is_pref) {
/*
}
/*
+ * Cache the original src_nid. If we need to resend the message
+ * then we'll need to know whether the src_nid was originally
+ * specified for this message. If it was originally specified,
+ * then we need to keep using the same src_nid since it's
+ * continuing the same sequence of messages.
+ */
+ msg->msg_src_nid_param = src_nid;
+
+ /*
* Now that we have a peer_ni, check if we want to discover
* the peer. Traffic to the LNET_RESERVED_PORTAL should not
* trigger discovery.
/* The peer may have changed. */
peer = lpni->lpni_peer_net->lpn_peer;
/* queue message and return */
- msg->msg_src_nid_param = src_nid;
msg->msg_rtr_nid_param = rtr_nid;
msg->msg_sending = 0;
list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
}
lnet_peer_ni_decref_locked(lpni);
- /* If peer is not healthy then can not send anything to it */
- if (!lnet_is_peer_healthy_locked(peer)) {
- lnet_net_unlock(cpt);
- return -EHOSTUNREACH;
- }
-
/*
* Identify the different send cases
*/
else
send_case |= REMOTE_DST;
- if (!lnet_peer_is_multi_rail(peer))
+ /*
+ * if this is a non-MR peer or if we're recovering a peer ni then
+ * let's consider this an NMR case so we can hit the destination
+ * NID.
+ */
+ if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery)
send_case |= NMR_DST;
else
send_case |= MR_DST;
* in the future
*/
/* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
- LASSERT (msg->msg_txpeer == NULL);
- LASSERT (!msg->msg_sending);
- LASSERT (!msg->msg_target_is_router);
- LASSERT (!msg->msg_receiving);
+ LASSERT(msg->msg_txpeer == NULL);
+ LASSERT(msg->msg_txni == NULL);
+ LASSERT(!msg->msg_sending);
+ LASSERT(!msg->msg_target_is_router);
+ LASSERT(!msg->msg_receiving);
msg->msg_sending = 1;
return 0;
}
+enum lnet_mt_event_type {
+ MT_TYPE_LOCAL_NI = 0,
+ MT_TYPE_PEER_NI
+};
+
+struct lnet_mt_event_info {
+ enum lnet_mt_event_type mt_type;
+ lnet_nid_t mt_nid;
+};
+
+void
+lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
+{
+ struct lnet_rsp_tracker *rspt;
+
+ /*
+ * msg has a refcount on the MD so the MD is not going away.
+ * The rspt queue for the cpt is protected by
+ * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
+ */
+ lnet_res_lock(cpt);
+ if (!md->md_rspt_ptr) {
+ lnet_res_unlock(cpt);
+ return;
+ }
+ rspt = md->md_rspt_ptr;
+ md->md_rspt_ptr = NULL;
+
+ /* debug code */
+ LASSERT(rspt->rspt_cpt == cpt);
+
+ /*
+ * invalidate the handle to indicate that a response has been
+ * received, which will then lead the monitor thread to clean up
+ * the rspt block.
+ */
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ lnet_res_unlock(cpt);
+}
+
+static void
+lnet_finalize_expired_responses(bool force)
+{
+ struct lnet_libmd *md;
+ struct list_head local_queue;
+ struct lnet_rsp_tracker *rspt, *tmp;
+ int i;
+
+ if (the_lnet.ln_mt_rstq == NULL)
+ return;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ INIT_LIST_HEAD(&local_queue);
+
+ lnet_net_lock(i);
+ if (!the_lnet.ln_mt_rstq[i]) {
+ lnet_net_unlock(i);
+ continue;
+ }
+ list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
+ lnet_net_unlock(i);
+
+ list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
+ /*
+ * The rspt mdh will be invalidated when a response
+ * is received or whenever we want to discard the
+ * block the monitor thread will walk the queue
+ * and clean up any rsts with an invalid mdh.
+ * The monitor thread will walk the queue until
+ * the first unexpired rspt block. This means that
+ * some rspt blocks which received their
+ * corresponding responses will linger in the
+ * queue until they are cleaned up eventually.
+ */
+ lnet_res_lock(i);
+ if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+ lnet_res_unlock(i);
+ list_del_init(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ continue;
+ }
+
+ if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
+ force) {
+ struct lnet_peer_ni *lpni;
+ lnet_nid_t nid;
+
+ md = lnet_handle2md(&rspt->rspt_mdh);
+ if (!md) {
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ lnet_res_unlock(i);
+ list_del_init(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ continue;
+ }
+ LASSERT(md->md_rspt_ptr == rspt);
+ md->md_rspt_ptr = NULL;
+ lnet_res_unlock(i);
+
+ lnet_net_lock(i);
+ the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
+ lnet_net_unlock(i);
+
+ list_del_init(&rspt->rspt_on_list);
+
+ nid = rspt->rspt_next_hop_nid;
+
+ CNETERR("Response timed out: md = %p: nid = %s\n",
+ md, libcfs_nid2str(nid));
+ LNetMDUnlink(rspt->rspt_mdh);
+ lnet_rspt_free(rspt, i);
+
+ /*
+ * If there is a timeout on the response
+ * from the next hop decrement its health
+ * value so that we don't use it
+ */
+ lnet_net_lock(0);
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (lpni) {
+ lnet_handle_remote_failure_locked(lpni);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+ lnet_net_unlock(0);
+ } else {
+ lnet_res_unlock(i);
+ break;
+ }
+ }
+
+ lnet_net_lock(i);
+ if (!list_empty(&local_queue))
+ list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
+ lnet_net_unlock(i);
+ }
+}
+
+static void
+lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
+{
+ struct lnet_msg *msg;
+
+ while (!list_empty(resendq)) {
+ struct lnet_peer_ni *lpni;
+
+ msg = list_entry(resendq->next, struct lnet_msg,
+ msg_list);
+
+ list_del_init(&msg->msg_list);
+
+ lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid);
+ if (!lpni) {
+ lnet_net_unlock(cpt);
+ CERROR("Expected that a peer is already created for %s\n",
+ libcfs_nid2str(msg->msg_hdr.dest_nid));
+ msg->msg_no_resend = true;
+ lnet_finalize(msg, -EFAULT);
+ lnet_net_lock(cpt);
+ } else {
+ struct lnet_peer *peer;
+ int rc;
+ lnet_nid_t src_nid = LNET_NID_ANY;
+
+ /*
+ * if this message is not being routed and the
+ * peer is non-MR then we must use the same
+ * src_nid that was used in the original send.
+ * Otherwise if we're routing the message (IE
+ * we're a router) then we can use any of our
+ * local interfaces. It doesn't matter to the
+ * final destination.
+ */
+ peer = lpni->lpni_peer_net->lpn_peer;
+ if (!msg->msg_routing &&
+ !lnet_peer_is_multi_rail(peer))
+ src_nid = le64_to_cpu(msg->msg_hdr.src_nid);
+
+ /*
+ * If we originally specified a src NID, then we
+ * must attempt to reuse it in the resend as well.
+ */
+ if (msg->msg_src_nid_param != LNET_NID_ANY)
+ src_nid = msg->msg_src_nid_param;
+ lnet_peer_ni_decref_locked(lpni);
+
+ lnet_net_unlock(cpt);
+ CDEBUG(D_NET, "resending %s->%s: %s recovery %d try# %d\n",
+ libcfs_nid2str(src_nid),
+ libcfs_id2str(msg->msg_target),
+ lnet_msgtyp2str(msg->msg_type),
+ msg->msg_recovery,
+ msg->msg_retry_count);
+ rc = lnet_send(src_nid, msg, LNET_NID_ANY);
+ if (rc) {
+ CERROR("Error sending %s to %s: %d\n",
+ lnet_msgtyp2str(msg->msg_type),
+ libcfs_id2str(msg->msg_target), rc);
+ msg->msg_no_resend = true;
+ lnet_finalize(msg, rc);
+ }
+ lnet_net_lock(cpt);
+ if (!rc)
+ the_lnet.ln_counters[cpt]->lct_health.lch_resend_count++;
+ }
+ }
+}
+
+static void
+lnet_resend_pending_msgs(void)
+{
+ int i;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ lnet_net_lock(i);
+ lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i);
+ lnet_net_unlock(i);
+ }
+}
+
+/* called with cpt and ni_lock held */
+static void
+lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt, bool force)
+{
+ struct lnet_handle_md recovery_mdh;
+
+ LNetInvalidateMDHandle(&recovery_mdh);
+
+ if (ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING ||
+ force) {
+ recovery_mdh = ni->ni_ping_mdh;
+ LNetInvalidateMDHandle(&ni->ni_ping_mdh);
+ }
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(cpt);
+ if (!LNetMDHandleIsInvalid(recovery_mdh))
+ LNetMDUnlink(recovery_mdh);
+ lnet_net_lock(cpt);
+ lnet_ni_lock(ni);
+}
+
+static void
+lnet_recover_local_nis(void)
+{
+ struct lnet_mt_event_info *ev_info;
+ struct list_head processed_list;
+ struct list_head local_queue;
+ struct lnet_handle_md mdh;
+ struct lnet_ni *tmp;
+ struct lnet_ni *ni;
+ lnet_nid_t nid;
+ int healthv;
+ int rc;
+
+ INIT_LIST_HEAD(&local_queue);
+ INIT_LIST_HEAD(&processed_list);
+
+ /*
+ * splice the recovery queue on a local queue. We will iterate
+ * through the local queue and update it as needed. Once we're
+ * done with the traversal, we'll splice the local queue back on
+ * the head of the ln_mt_localNIRecovq. Any newly added local NIs
+ * will be traversed in the next iteration.
+ */
+ lnet_net_lock(0);
+ list_splice_init(&the_lnet.ln_mt_localNIRecovq,
+ &local_queue);
+ lnet_net_unlock(0);
+
+ list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) {
+ /*
+ * if an NI is being deleted or it is now healthy, there
+ * is no need to keep it around in the recovery queue.
+ * The monitor thread is the only thread responsible for
+ * removing the NI from the recovery queue.
+ * Multiple threads can be adding NIs to the recovery
+ * queue.
+ */
+ healthv = atomic_read(&ni->ni_healthv);
+
+ lnet_net_lock(0);
+ lnet_ni_lock(ni);
+ if (ni->ni_state != LNET_NI_STATE_ACTIVE ||
+ healthv == LNET_MAX_HEALTH_VALUE) {
+ list_del_init(&ni->ni_recovery);
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, false);
+ lnet_ni_unlock(ni);
+ lnet_ni_decref_locked(ni, 0);
+ lnet_net_unlock(0);
+ continue;
+ }
+
+ /*
+ * if the local NI failed recovery we must unlink the md.
+ * But we want to keep the local_ni on the recovery queue
+ * so we can continue the attempts to recover it.
+ */
+ if (ni->ni_recovery_state & LNET_NI_RECOVERY_FAILED) {
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_FAILED;
+ }
+
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(0);
+
+
+ CDEBUG(D_NET, "attempting to recover local ni: %s\n",
+ libcfs_nid2str(ni->ni_nid));
+
+ lnet_ni_lock(ni);
+ if (!(ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING)) {
+ ni->ni_recovery_state |= LNET_NI_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+
+ LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(ni->ni_nid));
+ lnet_ni_lock(ni);
+ ni->ni_recovery_state &=
+ ~LNET_NI_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+ continue;
+ }
+
+ mdh = ni->ni_ping_mdh;
+ /*
+ * Invalidate the ni mdh in case it's deleted.
+ * We'll unlink the mdh in this case below.
+ */
+ LNetInvalidateMDHandle(&ni->ni_ping_mdh);
+ nid = ni->ni_nid;
+
+ /*
+ * remove the NI from the local queue and drop the
+ * reference count to it while we're recovering
+ * it. The reason for that, is that the NI could
+ * be deleted, and the way the code is structured
+ * is if we don't drop the NI, then the deletion
+ * code will enter a loop waiting for the
+ * reference count to be removed while holding the
+ * ln_mutex_lock(). When we look up the peer to
+ * send to in lnet_select_pathway() we will try to
+ * lock the ln_mutex_lock() as well, leading to
+ * a deadlock. By dropping the refcount and
+ * removing it from the list, we allow for the NI
+ * to be removed, then we use the cached NID to
+ * look it up again. If it's gone, then we just
+ * continue examining the rest of the queue.
+ */
+ lnet_net_lock(0);
+ list_del_init(&ni->ni_recovery);
+ lnet_ni_decref_locked(ni, 0);
+ lnet_net_unlock(0);
+
+ ev_info->mt_type = MT_TYPE_LOCAL_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
+ /* lookup the nid again */
+ lnet_net_lock(0);
+ ni = lnet_nid2ni_locked(nid, 0);
+ if (!ni) {
+ /*
+ * the NI has been deleted when we dropped
+ * the ref count
+ */
+ lnet_net_unlock(0);
+ LNetMDUnlink(mdh);
+ continue;
+ }
+ /*
+ * Same note as in lnet_recover_peer_nis(). When
+ * we're sending the ping, the NI is free to be
+ * deleted or manipulated. By this point it
+ * could've been added back on the recovery queue,
+ * and a refcount taken on it.
+ * So we can't just add it blindly again or we'll
+ * corrupt the queue. We must check under lock if
+ * it's not on any list and if not then add it
+ * to the processed list, which will eventually be
+ * spliced back on to the recovery queue.
+ */
+ ni->ni_ping_mdh = mdh;
+ if (list_empty(&ni->ni_recovery)) {
+ list_add_tail(&ni->ni_recovery, &processed_list);
+ lnet_ni_addref_locked(ni, 0);
+ }
+ lnet_net_unlock(0);
+
+ lnet_ni_lock(ni);
+ if (rc)
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
+ }
+ lnet_ni_unlock(ni);
+ }
+
+ /*
+ * put back the remaining NIs on the ln_mt_localNIRecovq to be
+ * reexamined in the next iteration.
+ */
+ list_splice_init(&processed_list, &local_queue);
+ lnet_net_lock(0);
+ list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq);
+ lnet_net_unlock(0);
+}
+
+static struct list_head **
+lnet_create_array_of_queues(void)
+{
+ struct list_head **qs;
+ struct list_head *q;
+ int i;
+
+ qs = cfs_percpt_alloc(lnet_cpt_table(),
+ sizeof(struct list_head));
+ if (!qs) {
+ CERROR("Failed to allocate queues\n");
+ return NULL;
+ }
+
+ cfs_percpt_for_each(q, i, qs)
+ INIT_LIST_HEAD(q);
+
+ return qs;
+}
+
+static int
+lnet_resendqs_create(void)
+{
+ struct list_head **resendqs;
+ resendqs = lnet_create_array_of_queues();
+
+ if (!resendqs)
+ return -ENOMEM;
+
+ lnet_net_lock(LNET_LOCK_EX);
+ the_lnet.ln_mt_resendqs = resendqs;
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ return 0;
+}
+
+static void
+lnet_clean_local_ni_recoveryq(void)
+{
+ struct lnet_ni *ni;
+
+ /* This is only called when the monitor thread has stopped */
+ lnet_net_lock(0);
+
+ while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) {
+ ni = list_entry(the_lnet.ln_mt_localNIRecovq.next,
+ struct lnet_ni, ni_recovery);
+ list_del_init(&ni->ni_recovery);
+ lnet_ni_lock(ni);
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
+ lnet_ni_unlock(ni);
+ lnet_ni_decref_locked(ni, 0);
+ }
+
+ lnet_net_unlock(0);
+}
+
+static void
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt,
+ bool force)
+{
+ struct lnet_handle_md recovery_mdh;
+
+ LNetInvalidateMDHandle(&recovery_mdh);
+
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING || force) {
+ recovery_mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ }
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(cpt);
+ if (!LNetMDHandleIsInvalid(recovery_mdh))
+ LNetMDUnlink(recovery_mdh);
+ lnet_net_lock(cpt);
+ spin_lock(&lpni->lpni_lock);
+}
+
+static void
+lnet_clean_peer_ni_recoveryq(void)
+{
+ struct lnet_peer_ni *lpni, *tmp;
+
+ lnet_net_lock(LNET_LOCK_EX);
+
+ list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
+ lpni_recovery) {
+ list_del_init(&lpni->lpni_recovery);
+ spin_lock(&lpni->lpni_lock);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX, true);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static void
+lnet_clean_resendqs(void)
+{
+ struct lnet_msg *msg, *tmp;
+ struct list_head msgs;
+ int i;
+
+ INIT_LIST_HEAD(&msgs);
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ lnet_net_lock(i);
+ list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs);
+ lnet_net_unlock(i);
+ list_for_each_entry_safe(msg, tmp, &msgs, msg_list) {
+ list_del_init(&msg->msg_list);
+ msg->msg_no_resend = true;
+ lnet_finalize(msg, -ESHUTDOWN);
+ }
+ }
+
+ cfs_percpt_free(the_lnet.ln_mt_resendqs);
+}
+
+static void
+lnet_recover_peer_nis(void)
+{
+ struct lnet_mt_event_info *ev_info;
+ struct list_head processed_list;
+ struct list_head local_queue;
+ struct lnet_handle_md mdh;
+ struct lnet_peer_ni *lpni;
+ struct lnet_peer_ni *tmp;
+ lnet_nid_t nid;
+ int healthv;
+ int rc;
+
+ INIT_LIST_HEAD(&local_queue);
+ INIT_LIST_HEAD(&processed_list);
+
+ /*
+ * Always use cpt 0 for locking across all interactions with
+ * ln_mt_peerNIRecovq
+ */
+ lnet_net_lock(0);
+ list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
+ &local_queue);
+ lnet_net_unlock(0);
+
+ list_for_each_entry_safe(lpni, tmp, &local_queue,
+ lpni_recovery) {
+ /*
+ * The same protection strategy is used here as is in the
+ * local recovery case.
+ */
+ lnet_net_lock(0);
+ healthv = atomic_read(&lpni->lpni_healthv);
+ spin_lock(&lpni->lpni_lock);
+ if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
+ healthv == LNET_MAX_HEALTH_VALUE) {
+ list_del_init(&lpni->lpni_recovery);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, false);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+ continue;
+ }
+
+ /*
+ * If the peer NI has failed recovery we must unlink the
+ * md. But we want to keep the peer ni on the recovery
+ * queue so we can try to continue recovering it
+ */
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_FAILED) {
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, true);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_FAILED;
+ }
+
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(0);
+
+ /*
+ * NOTE: we're racing with peer deletion from user space.
+ * It's possible that a peer is deleted after we check its
+ * state. In this case the recovery can create a new peer
+ */
+ spin_lock(&lpni->lpni_lock);
+ if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
+ !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
+ lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+
+ LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(lpni->lpni_nid));
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+ continue;
+ }
+
+ /* look at the comments in lnet_recover_local_nis() */
+ mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ nid = lpni->lpni_nid;
+ lnet_net_lock(0);
+ list_del_init(&lpni->lpni_recovery);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ ev_info->mt_type = MT_TYPE_PEER_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
+ lnet_net_lock(0);
+ /*
+ * lnet_find_peer_ni_locked() grabs a refcount for
+ * us. No need to take it explicitly.
+ */
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(0);
+ LNetMDUnlink(mdh);
+ continue;
+ }
+
+ lpni->lpni_recovery_ping_mdh = mdh;
+ /*
+ * While we're unlocked the lpni could've been
+ * readded on the recovery queue. In this case we
+ * don't need to add it to the local queue, since
+ * it's already on there and the thread that added
+ * it would've incremented the refcount on the
+ * peer, which means we need to decref the refcount
+ * that was implicitly grabbed by find_peer_ni_locked.
+ * Otherwise, if the lpni is still not on
+ * the recovery queue, then we'll add it to the
+ * processed list.
+ */
+ if (list_empty(&lpni->lpni_recovery))
+ list_add_tail(&lpni->lpni_recovery, &processed_list);
+ else
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ spin_lock(&lpni->lpni_lock);
+ if (rc)
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ }
+ spin_unlock(&lpni->lpni_lock);
+ }
+
+ list_splice_init(&processed_list, &local_queue);
+ lnet_net_lock(0);
+ list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
+ lnet_net_unlock(0);
+}
+
+static int
+lnet_monitor_thread(void *arg)
+{
+ time64_t recovery_timeout = 0;
+ time64_t rsp_timeout = 0;
+ int interval;
+ time64_t now;
+
+ /*
+ * The monitor thread takes care of the following:
+ * 1. Checks the aliveness of routers
+ * 2. Checks if there are messages on the resend queue to resend
+ * them.
+ * 3. Check if there are any NIs on the local recovery queue and
+ * pings them
+ * 4. Checks if there are any NIs on the remote recovery queue
+ * and pings them.
+ */
+ cfs_block_allsigs();
+
+ while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
+ now = ktime_get_real_seconds();
+
+ if (lnet_router_checker_active())
+ lnet_check_routers();
+
+ lnet_resend_pending_msgs();
+
+ if (now >= rsp_timeout) {
+ lnet_finalize_expired_responses(false);
+ rsp_timeout = now + (lnet_transaction_timeout / 2);
+ }
+
+ if (now >= recovery_timeout) {
+ lnet_recover_local_nis();
+ lnet_recover_peer_nis();
+ recovery_timeout = now + lnet_recovery_interval;
+ }
+
+ /*
+ * TODO do we need to check if we should sleep without
+ * timeout? Technically, an active system will always
+ * have messages in flight so this check will always
+ * evaluate to false. And on an idle system do we care
+ * if we wake up every 1 second? Although, we've seen
+ * cases where we get a complaint that an idle thread
+ * is waking up unnecessarily.
+ */
+ interval = min(lnet_recovery_interval,
+ lnet_transaction_timeout / 2);
+ wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
+ false,
+ cfs_time_seconds(interval));
+ }
+
+ /* clean up the router checker */
+ lnet_prune_rc_data(1);
+
+ /* Shutting down */
+ the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+
+ /* signal that the monitor thread is exiting */
+ up(&the_lnet.ln_mt_signal);
+
+ return 0;
+}
+
+/*
+ * lnet_send_ping
+ * Sends a ping.
+ * Returns == 0 if success
+ * Returns > 0 if LNetMDBind or prior fails
+ * Returns < 0 if LNetGet fails
+ */
+int
+lnet_send_ping(lnet_nid_t dest_nid,
+ struct lnet_handle_md *mdh, int nnis,
+ void *user_data, struct lnet_handle_eq eqh, bool recovery)
+{
+ struct lnet_md md = { NULL };
+ struct lnet_process_id id;
+ struct lnet_ping_buffer *pbuf;
+ int rc;
+
+ if (dest_nid == LNET_NID_ANY) {
+ rc = -EHOSTUNREACH;
+ goto fail_error;
+ }
+
+ pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
+ if (!pbuf) {
+ rc = ENOMEM;
+ goto fail_error;
+ }
+
+ /* initialize md content */
+ md.start = &pbuf->pb_info;
+ md.length = LNET_PING_INFO_SIZE(nnis);
+ md.threshold = 2; /* GET/REPLY */
+ md.max_size = 0;
+ md.options = LNET_MD_TRUNCATE;
+ md.user_ptr = user_data;
+ md.eq_handle = eqh;
+
+ rc = LNetMDBind(md, LNET_UNLINK, mdh);
+ if (rc) {
+ lnet_ping_buffer_decref(pbuf);
+ CERROR("Can't bind MD: %d\n", rc);
+ rc = -rc; /* change the rc to positive */
+ goto fail_error;
+ }
+ id.pid = LNET_PID_LUSTRE;
+ id.nid = dest_nid;
+
+ rc = LNetGet(LNET_NID_ANY, *mdh, id,
+ LNET_RESERVED_PORTAL,
+ LNET_PROTO_PING_MATCHBITS, 0, recovery);
+
+ if (rc)
+ goto fail_unlink_md;
+
+ return 0;
+
+fail_unlink_md:
+ LNetMDUnlink(*mdh);
+ LNetInvalidateMDHandle(mdh);
+fail_error:
+ return rc;
+}
+
+static void
+lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
+ int status)
+{
+ lnet_nid_t nid = ev_info->mt_nid;
+
+ if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
+ struct lnet_ni *ni;
+
+ lnet_net_lock(0);
+ ni = lnet_nid2ni_locked(nid, 0);
+ if (!ni) {
+ lnet_net_unlock(0);
+ return;
+ }
+ lnet_ni_lock(ni);
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
+ if (status)
+ ni->ni_recovery_state |= LNET_NI_RECOVERY_FAILED;
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(0);
+
+ if (status != 0) {
+ CERROR("local NI (%s) recovery failed with %d\n",
+ libcfs_nid2str(nid), status);
+ return;
+ }
+ /*
+ * need to increment healthv for the ni here, because in
+ * the lnet_finalize() path we don't have access to this
+ * NI. And in order to get access to it, we'll need to
+ * carry forward too much information.
+ * In the peer case, it'll naturally be incremented
+ */
+ lnet_inc_healthv(&ni->ni_healthv);
+ } else {
+ struct lnet_peer_ni *lpni;
+ int cpt;
+
+ cpt = lnet_net_lock_current();
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(cpt);
+ return;
+ }
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ if (status)
+ lpni->lpni_state |= LNET_PEER_NI_RECOVERY_FAILED;
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(cpt);
+
+ if (status != 0)
+ CERROR("peer NI (%s) recovery failed with %d\n",
+ libcfs_nid2str(nid), status);
+ }
+}
+
+static void
+lnet_mt_event_handler(struct lnet_event *event)
+{
+ struct lnet_mt_event_info *ev_info = event->md.user_ptr;
+ struct lnet_ping_buffer *pbuf;
+
+ /* TODO: remove assert */
+ LASSERT(event->type == LNET_EVENT_REPLY ||
+ event->type == LNET_EVENT_SEND ||
+ event->type == LNET_EVENT_UNLINK);
+
+ CDEBUG(D_NET, "Received event: %d status: %d\n", event->type,
+ event->status);
+
+ switch (event->type) {
+ case LNET_EVENT_UNLINK:
+ CDEBUG(D_NET, "%s recovery ping unlinked\n",
+ libcfs_nid2str(ev_info->mt_nid));
+ case LNET_EVENT_REPLY:
+ lnet_handle_recovery_reply(ev_info, event->status);
+ break;
+ case LNET_EVENT_SEND:
+ CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
+ libcfs_nid2str(ev_info->mt_nid),
+ (event->status) ? "unsuccessfully" :
+ "successfully", event->status);
+ lnet_handle_recovery_reply(ev_info, event->status);
+ break;
+ default:
+ CERROR("Unexpected event: %d\n", event->type);
+ break;
+ }
+ if (event->unlinked) {
+ LIBCFS_FREE(ev_info, sizeof(*ev_info));
+ pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
+ lnet_ping_buffer_decref(pbuf);
+ }
+}
+
+static int
+lnet_rsp_tracker_create(void)
+{
+ struct list_head **rstqs;
+ rstqs = lnet_create_array_of_queues();
+
+ if (!rstqs)
+ return -ENOMEM;
+
+ the_lnet.ln_mt_rstq = rstqs;
+
+ return 0;
+}
+
+static void
+lnet_rsp_tracker_clean(void)
+{
+ lnet_finalize_expired_responses(true);
+
+ cfs_percpt_free(the_lnet.ln_mt_rstq);
+ the_lnet.ln_mt_rstq = NULL;
+}
+
+int lnet_monitor_thr_start(void)
+{
+ int rc = 0;
+ struct task_struct *task;
+
+ if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN)
+ return -EALREADY;
+
+ rc = lnet_resendqs_create();
+ if (rc)
+ return rc;
+
+ rc = lnet_rsp_tracker_create();
+ if (rc)
+ goto clean_queues;
+
+ rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
+ if (rc != 0) {
+ CERROR("Can't allocate monitor thread EQ: %d\n", rc);
+ goto clean_queues;
+ }
+
+ /* Pre monitor thread start processing */
+ rc = lnet_router_pre_mt_start();
+ if (rc)
+ goto free_mem;
+
+ sema_init(&the_lnet.ln_mt_signal, 0);
+
+ the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
+ task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CERROR("Can't start monitor thread: %d\n", rc);
+ goto clean_thread;
+ }
+
+ /* post monitor thread start processing */
+ lnet_router_post_mt_start();
+
+ return 0;
+
+clean_thread:
+ the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+ /* block until event callback signals exit */
+ down(&the_lnet.ln_mt_signal);
+ /* clean up */
+ lnet_router_cleanup();
+free_mem:
+ the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+ lnet_rsp_tracker_clean();
+ lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
+ LNetEQFree(the_lnet.ln_mt_eqh);
+ LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+ return rc;
+clean_queues:
+ lnet_rsp_tracker_clean();
+ lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
+ return rc;
+}
+
+void lnet_monitor_thr_stop(void)
+{
+ int rc;
+
+ if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+ return;
+
+ LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
+ the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+
+ /* tell the monitor thread that we're shutting down */
+ wake_up(&the_lnet.ln_mt_waitq);
+
+ /* block until monitor thread signals that it's done */
+ down(&the_lnet.ln_mt_signal);
+ LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
+
+ /* perform cleanup tasks */
+ lnet_router_cleanup();
+ lnet_rsp_tracker_clean();
+ lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
+ rc = LNetEQFree(the_lnet.ln_mt_eqh);
+ LASSERT(rc == 0);
+ return;
+}
+
void
lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
__u32 msg_type)
{
lnet_net_lock(cpt);
lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
- the_lnet.ln_counters[cpt]->drop_count++;
- the_lnet.ln_counters[cpt]->drop_length += nob;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length += nob;
lnet_net_unlock(cpt);
lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
static int
lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
{
- void *private = msg->msg_private;
- struct lnet_hdr *hdr = &msg->msg_hdr;
+ void *private = msg->msg_private;
+ struct lnet_hdr *hdr = &msg->msg_hdr;
struct lnet_process_id src = {0};
- struct lnet_libmd *md;
- int rlength;
- int mlength;
- int cpt;
+ struct lnet_libmd *md;
+ int rlength;
+ int mlength;
+ int cpt;
cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
lnet_res_lock(cpt);
static int
lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg)
{
- struct lnet_hdr *hdr = &msg->msg_hdr;
+ struct lnet_hdr *hdr = &msg->msg_hdr;
struct lnet_process_id src = {0};
- struct lnet_libmd *md;
- int cpt;
+ struct lnet_libmd *md;
+ int cpt;
src.nid = hdr->src_nid;
src.pid = hdr->src_pid;
}
if (!list_empty(&the_lnet.ln_drop_rules) &&
- lnet_drop_rule_match(hdr)) {
+ lnet_drop_rule_match(hdr, NULL)) {
CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
"silent message loss\n",
libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
msg->msg_private, msg->msg_len,
msg->msg_type);
+
+ msg->msg_no_resend = true;
/*
* NB: message will not generate event because w/o attached MD,
* but we still should give error code so lnet_msg_decommit()
}
}
+static void
+lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
+ struct lnet_libmd *md, struct lnet_handle_md mdh)
+{
+ s64 timeout_ns;
+
+ /*
+ * MD has a refcount taken by message so it's not going away.
+ * The MD however can be looked up. We need to secure the access
+ * to the md_rspt_ptr by taking the res_lock.
+ * The rspt can be accessed without protection up to when it gets
+ * added to the list.
+ */
+
+ /* debug code */
+ LASSERT(md->md_rspt_ptr == NULL);
+
+ /* we'll use that same event in case we never get a response */
+ rspt->rspt_mdh = mdh;
+ rspt->rspt_cpt = cpt;
+ timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+ rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
+
+ lnet_res_lock(cpt);
+ /* store the rspt so we can access it when we get the REPLY */
+ md->md_rspt_ptr = rspt;
+ lnet_res_unlock(cpt);
+
+ /*
+ * add to the list of tracked responses. It's added to tail of the
+ * list in order to expire all the older entries first.
+ */
+ lnet_net_lock(cpt);
+ list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
+ lnet_net_unlock(cpt);
+}
+
/**
* Initiate an asynchronous PUT operation.
*
__u64 match_bits, unsigned int offset,
__u64 hdr_data)
{
- struct lnet_msg *msg;
- struct lnet_libmd *md;
- int cpt;
- int rc;
+ struct lnet_msg *msg;
+ struct lnet_libmd *md;
+ int cpt;
+ int rc;
+ struct lnet_rsp_tracker *rspt = NULL;
LASSERT(the_lnet.ln_refcount > 0);
msg->msg_vmflush = !!memory_pressure_get();
cpt = lnet_cpt_of_cookie(mdh.cookie);
+
+ if (ack == LNET_ACK_REQ) {
+ rspt = lnet_rspt_alloc(cpt);
+ if (!rspt) {
+ CERROR("Dropping PUT to %s: ENOMEM on response tracker\n",
+ libcfs_id2str(target));
+ return -ENOMEM;
+ }
+ INIT_LIST_HEAD(&rspt->rspt_on_list);
+ }
+
lnet_res_lock(cpt);
md = lnet_handle2md(&mdh);
md->md_me->me_portal);
lnet_res_unlock(cpt);
+ LIBCFS_FREE(rspt, sizeof(*rspt));
lnet_msg_free(msg);
return -ENOENT;
}
lnet_build_msg_event(msg, LNET_EVENT_SEND);
+ if (ack == LNET_ACK_REQ)
+ lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
+
rc = lnet_send(self, msg, LNET_NID_ANY);
if (rc != 0) {
CNETERR("Error sending PUT to %s: %d\n",
libcfs_id2str(target), rc);
+ msg->msg_no_resend = true;
+ lnet_detach_rsp_tracker(msg->msg_md, cpt);
lnet_finalize(msg, rc);
}
lnet_net_lock(cpt);
lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
- the_lnet.ln_counters[cpt]->drop_count++;
- the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
+ getmd->md_length;
lnet_net_unlock(cpt);
if (msg != NULL)
int
LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
struct lnet_process_id target, unsigned int portal,
- __u64 match_bits, unsigned int offset)
+ __u64 match_bits, unsigned int offset, bool recovery)
{
- struct lnet_msg *msg;
- struct lnet_libmd *md;
- int cpt;
- int rc;
+ struct lnet_msg *msg;
+ struct lnet_libmd *md;
+ struct lnet_rsp_tracker *rspt;
+ int cpt;
+ int rc;
LASSERT(the_lnet.ln_refcount > 0);
}
msg = lnet_msg_alloc();
- if (msg == NULL) {
+ if (!msg) {
CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n",
libcfs_id2str(target));
return -ENOMEM;
}
cpt = lnet_cpt_of_cookie(mdh.cookie);
+
+ rspt = lnet_rspt_alloc(cpt);
+ if (!rspt) {
+ CERROR("Dropping GET to %s: ENOMEM on response tracker\n",
+ libcfs_id2str(target));
+ return -ENOMEM;
+ }
+ INIT_LIST_HEAD(&rspt->rspt_on_list);
+
+ msg->msg_recovery = recovery;
+
lnet_res_lock(cpt);
md = lnet_handle2md(&mdh);
lnet_res_unlock(cpt);
lnet_msg_free(msg);
+ LIBCFS_FREE(rspt, sizeof(*rspt));
return -ENOENT;
}
lnet_build_msg_event(msg, LNET_EVENT_SEND);
+ lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
+
rc = lnet_send(self, msg, LNET_NID_ANY);
if (rc < 0) {
CNETERR("Error sending GET to %s: %d\n",
libcfs_id2str(target), rc);
+ msg->msg_no_resend = true;
+ lnet_detach_rsp_tracker(msg->msg_md, cpt);
lnet_finalize(msg, rc);
}