Whamcloud - gitweb
LU-9480 lnet: add enhanced statistics
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index b8e1f2b..7309d43 100644 (file)
@@ -44,6 +44,101 @@ static int local_nid_dist_zero = 1;
 module_param(local_nid_dist_zero, int, 0444);
 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
 
+static inline struct lnet_comm_count *
+get_stats_counts(struct lnet_element_stats *stats,
+                enum lnet_stats_type stats_type)
+{
+       switch (stats_type) {
+       case LNET_STATS_TYPE_SEND:
+               return &stats->el_send_stats;
+       case LNET_STATS_TYPE_RECV:
+               return &stats->el_recv_stats;
+       case LNET_STATS_TYPE_DROP:
+               return &stats->el_drop_stats;
+       default:
+               CERROR("Unknown stats type\n");
+       }
+
+       return NULL;
+}
+
+void lnet_incr_stats(struct lnet_element_stats *stats, lnet_msg_type_t msg_type,
+                    enum lnet_stats_type stats_type)
+{
+       struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
+       if (!counts)
+               return;
+
+       switch (msg_type) {
+       case LNET_MSG_ACK:
+               atomic_inc(&counts->co_ack_count);
+               break;
+       case LNET_MSG_PUT:
+               atomic_inc(&counts->co_put_count);
+               break;
+       case LNET_MSG_GET:
+               atomic_inc(&counts->co_get_count);
+               break;
+       case LNET_MSG_REPLY:
+               atomic_inc(&counts->co_reply_count);
+               break;
+       case LNET_MSG_HELLO:
+               atomic_inc(&counts->co_hello_count);
+               break;
+       default:
+               CERROR("There is a BUG in the code. Unknown message type\n");
+               break;
+       }
+}
+
+__u32 lnet_sum_stats(struct lnet_element_stats *stats,
+                    enum lnet_stats_type stats_type)
+{
+       struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
+       if (!counts)
+               return 0;
+
+       return (atomic_read(&counts->co_ack_count) +
+               atomic_read(&counts->co_put_count) +
+               atomic_read(&counts->co_get_count) +
+               atomic_read(&counts->co_reply_count) +
+               atomic_read(&counts->co_hello_count));
+}
+
+static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats,
+                               struct lnet_comm_count *counts)
+{
+       msg_stats->ico_get_count = atomic_read(&counts->co_get_count);
+       msg_stats->ico_put_count = atomic_read(&counts->co_put_count);
+       msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count);
+       msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count);
+       msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count);
+}
+
+void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
+                             struct lnet_element_stats *stats)
+{
+       struct lnet_comm_count *counts;
+
+       LASSERT(msg_stats);
+       LASSERT(stats);
+
+       counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND);
+       if (!counts)
+               return;
+       assign_stats(&msg_stats->im_send_stats, counts);
+
+       counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV);
+       if (!counts)
+               return;
+       assign_stats(&msg_stats->im_recv_stats, counts);
+
+       counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP);
+       if (!counts)
+               return;
+       assign_stats(&msg_stats->im_drop_stats, counts);
+}
+
 int
 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
 {
@@ -630,6 +725,8 @@ lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
 
        memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
        msg->msg_hdr.type           = cpu_to_le32(type);
+       /* dest_nid will be overwritten by lnet_select_pathway() */
+       msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
        msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
        /* src_nid will be set later */
        msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
@@ -822,9 +919,13 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
                the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
                lnet_net_unlock(cpt);
                if (msg->msg_txpeer)
-                       atomic_inc(&msg->msg_txpeer->lpni_stats.drop_count);
+                       lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
+                                       msg->msg_type,
+                                       LNET_STATS_TYPE_DROP);
                if (msg->msg_txni)
-                       atomic_inc(&msg->msg_txni->ni_stats.drop_count);
+                       lnet_incr_stats(&msg->msg_txni->ni_stats,
+                                       msg->msg_type,
+                                       LNET_STATS_TYPE_DROP);
 
                CNETERR("Dropping message for %s: peer not alive\n",
                        libcfs_id2str(msg->msg_target));
@@ -1389,6 +1490,27 @@ lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni,
        return best_ni;
 }
 
+/*
+ * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery,
+ * because such traffic is required to perform discovery. We therefore
+ * exclude all GET and PUT on that portal. We also exclude all ACK and
+ * REPLY traffic, but that is because the portal is not tracked in the
+ * message structure for these message types. We could restrict this
+ * further by also checking for LNET_PROTO_PING_MATCHBITS.
+ */
+static bool
+lnet_msg_discovery(struct lnet_msg *msg)
+{
+       if (msg->msg_type == LNET_MSG_PUT) {
+               if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL)
+                       return true;
+       } else if (msg->msg_type == LNET_MSG_GET) {
+               if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL)
+                       return true;
+       }
+       return false;
+}
+
 static int
 lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
                    struct lnet_msg *msg, lnet_nid_t rtr_nid)
@@ -1401,7 +1523,6 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
        struct lnet_peer        *peer;
        struct lnet_peer_net    *peer_net;
        struct lnet_net         *local_net;
-       __u32                   seq;
        int                     cpt, cpt2, rc;
        bool                    routing;
        bool                    routing2;
@@ -1436,13 +1557,6 @@ again:
        routing2 = false;
        local_found = false;
 
-       seq = lnet_get_dlc_seq_locked();
-
-       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
-               lnet_net_unlock(cpt);
-               return -ESHUTDOWN;
-       }
-
        /*
         * lnet_nid2peerni_locked() is the path that will find an
         * existing peer_ni, or create one and mark it as having been
@@ -1453,7 +1567,34 @@ again:
                lnet_net_unlock(cpt);
                return PTR_ERR(lpni);
        }
+       /*
+        * Now that we have a peer_ni, check if we want to discover
+        * the peer. Traffic to the LNET_RESERVED_PORTAL should not
+        * trigger discovery.
+        */
        peer = lpni->lpni_peer_net->lpn_peer;
+       if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
+               rc = lnet_discover_peer_locked(lpni, cpt, false);
+               if (rc) {
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(cpt);
+                       return rc;
+               }
+               /* The peer may have changed. */
+               peer = lpni->lpni_peer_net->lpn_peer;
+               /* queue message and return */
+               msg->msg_src_nid_param = src_nid;
+               msg->msg_rtr_nid_param = rtr_nid;
+               msg->msg_sending = 0;
+               list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+               lnet_peer_ni_decref_locked(lpni);
+               lnet_net_unlock(cpt);
+
+               CDEBUG(D_NET, "%s pending discovery\n",
+                      libcfs_nid2str(peer->lp_primary_nid));
+
+               return LNET_DC_WAIT;
+       }
        lnet_peer_ni_decref_locked(lpni);
 
        /* If peer is not healthy then can not send anything to it */
@@ -1568,8 +1709,9 @@ again:
                        /* Get the target peer_ni */
                        peer_net = lnet_peer_get_net_locked(peer,
                                        LNET_NIDNET(best_lpni->lpni_nid));
+                       LASSERT(peer_net != NULL);
                        list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
-                                           lpni_on_peer_net_list) {
+                                           lpni_peer_nis) {
                                if (lpni->lpni_pref_nnids == 0)
                                        continue;
                                LASSERT(lpni->lpni_pref_nnids == 1);
@@ -1592,7 +1734,7 @@ again:
                        }
                        lpni = list_entry(peer_net->lpn_peer_nis.next,
                                        struct lnet_peer_ni,
-                                       lpni_on_peer_net_list);
+                                       lpni_peer_nis);
                }
                /* Set preferred NI if necessary. */
                if (lpni->lpni_pref_nnids == 0)
@@ -1624,7 +1766,7 @@ again:
         * then the best route is chosen. If all routes are equal then
         * they are used in round robin.
         */
-       list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_on_peer_list) {
+       list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
                if (!lnet_is_peer_net_healthy_locked(peer_net))
                        continue;
 
@@ -1634,7 +1776,7 @@ again:
 
                        lpni = list_entry(peer_net->lpn_peer_nis.next,
                                          struct lnet_peer_ni,
-                                         lpni_on_peer_net_list);
+                                         lpni_peer_nis);
 
                        net_gw = lnet_find_route_locked(NULL,
                                                        lpni->lpni_nid,
@@ -1881,6 +2023,7 @@ send:
         */
        cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
        if (cpt != cpt2) {
+               __u32 seq = lnet_get_dlc_seq_locked();
                lnet_net_unlock(cpt);
                cpt = cpt2;
                lnet_net_lock(cpt);
@@ -1991,14 +2134,16 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
        if (rc == LNET_CREDIT_OK)
                lnet_ni_send(msg->msg_txni, msg);
 
-       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
+       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
        return 0;
 }
 
 void
-lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob)
+lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
+                 __u32 msg_type)
 {
        lnet_net_lock(cpt);
+       lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
        the_lnet.ln_counters[cpt]->drop_count++;
        the_lnet.ln_counters[cpt]->drop_length += nob;
        lnet_net_unlock(cpt);
@@ -2434,11 +2579,12 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        for_me = (ni->ni_nid == dest_nid);
        cpt = lnet_cpt_of_nid(from_nid, ni);
 
-       CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s\n",
+       CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s - %s\n",
                libcfs_nid2str(dest_nid),
                libcfs_nid2str(ni->ni_nid),
                libcfs_nid2str(src_nid),
-               lnet_msgtyp2str(type));
+               lnet_msgtyp2str(type),
+               (for_me) ? "for me" : "routed");
 
        switch (type) {
        case LNET_MSG_ACK:
@@ -2654,7 +2800,7 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        lnet_finalize(msg, rc);
 
  drop:
-       lnet_drop_message(ni, cpt, private, payload_length);
+       lnet_drop_message(ni, cpt, private, payload_length, type);
        return 0;
 }
 EXPORT_SYMBOL(lnet_parse);
@@ -2690,7 +2836,8 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                 * until that's done */
 
                lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
-                                 msg->msg_private, msg->msg_len);
+                                 msg->msg_private, msg->msg_len,
+                                 msg->msg_type);
                /*
                 * NB: message will not generate event because w/o attached MD,
                 * but we still should give error code so lnet_msg_decommit()
@@ -2930,6 +3077,7 @@ lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg)
        cpt = lnet_cpt_of_nid(peer_id.nid, ni);
 
        lnet_net_lock(cpt);
+       lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
        the_lnet.ln_counters[cpt]->drop_count++;
        the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
        lnet_net_unlock(cpt);