Whamcloud - gitweb
LU-9480 lnet: add enhanced statistics
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index 65a4635..7309d43 100644 (file)
@@ -44,6 +44,101 @@ static int local_nid_dist_zero = 1;
 module_param(local_nid_dist_zero, int, 0444);
 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
 
+static inline struct lnet_comm_count *
+get_stats_counts(struct lnet_element_stats *stats,
+                enum lnet_stats_type stats_type)
+{
+       switch (stats_type) {
+       case LNET_STATS_TYPE_SEND:
+               return &stats->el_send_stats;
+       case LNET_STATS_TYPE_RECV:
+               return &stats->el_recv_stats;
+       case LNET_STATS_TYPE_DROP:
+               return &stats->el_drop_stats;
+       default:
+               CERROR("Unknown stats type\n");
+       }
+
+       return NULL;
+}
+
+void lnet_incr_stats(struct lnet_element_stats *stats, lnet_msg_type_t msg_type,
+                    enum lnet_stats_type stats_type)
+{
+       struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
+       if (!counts)
+               return;
+
+       switch (msg_type) {
+       case LNET_MSG_ACK:
+               atomic_inc(&counts->co_ack_count);
+               break;
+       case LNET_MSG_PUT:
+               atomic_inc(&counts->co_put_count);
+               break;
+       case LNET_MSG_GET:
+               atomic_inc(&counts->co_get_count);
+               break;
+       case LNET_MSG_REPLY:
+               atomic_inc(&counts->co_reply_count);
+               break;
+       case LNET_MSG_HELLO:
+               atomic_inc(&counts->co_hello_count);
+               break;
+       default:
+               CERROR("There is a BUG in the code. Unknown message type\n");
+               break;
+       }
+}
+
+__u32 lnet_sum_stats(struct lnet_element_stats *stats,
+                    enum lnet_stats_type stats_type)
+{
+       struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
+       if (!counts)
+               return 0;
+
+       return (atomic_read(&counts->co_ack_count) +
+               atomic_read(&counts->co_put_count) +
+               atomic_read(&counts->co_get_count) +
+               atomic_read(&counts->co_reply_count) +
+               atomic_read(&counts->co_hello_count));
+}
+
+static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats,
+                               struct lnet_comm_count *counts)
+{
+       msg_stats->ico_get_count = atomic_read(&counts->co_get_count);
+       msg_stats->ico_put_count = atomic_read(&counts->co_put_count);
+       msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count);
+       msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count);
+       msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count);
+}
+
+void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
+                             struct lnet_element_stats *stats)
+{
+       struct lnet_comm_count *counts;
+
+       LASSERT(msg_stats);
+       LASSERT(stats);
+
+       counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND);
+       if (!counts)
+               return;
+       assign_stats(&msg_stats->im_send_stats, counts);
+
+       counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV);
+       if (!counts)
+               return;
+       assign_stats(&msg_stats->im_recv_stats, counts);
+
+       counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP);
+       if (!counts)
+               return;
+       assign_stats(&msg_stats->im_drop_stats, counts);
+}
+
 int
 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
 {
@@ -630,6 +725,8 @@ lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
 
        memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
        msg->msg_hdr.type           = cpu_to_le32(type);
+       /* dest_nid will be overwritten by lnet_select_pathway() */
+       msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
        msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
        /* src_nid will be set later */
        msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
@@ -822,9 +919,13 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
                the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
                lnet_net_unlock(cpt);
                if (msg->msg_txpeer)
-                       atomic_inc(&msg->msg_txpeer->lpni_stats.drop_count);
+                       lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
+                                       msg->msg_type,
+                                       LNET_STATS_TYPE_DROP);
                if (msg->msg_txni)
-                       atomic_inc(&msg->msg_txni->ni_stats.drop_count);
+                       lnet_incr_stats(&msg->msg_txni->ni_stats,
+                                       msg->msg_type,
+                                       LNET_STATS_TYPE_DROP);
 
                CNETERR("Dropping message for %s: peer not alive\n",
                        libcfs_id2str(msg->msg_target));
@@ -1473,7 +1574,7 @@ again:
         */
        peer = lpni->lpni_peer_net->lpn_peer;
        if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
-               rc = lnet_discover_peer_locked(lpni, cpt);
+               rc = lnet_discover_peer_locked(lpni, cpt, false);
                if (rc) {
                        lnet_peer_ni_decref_locked(lpni);
                        lnet_net_unlock(cpt);
@@ -1481,6 +1582,18 @@ again:
                }
                /* The peer may have changed. */
                peer = lpni->lpni_peer_net->lpn_peer;
+               /* queue message and return */
+               msg->msg_src_nid_param = src_nid;
+               msg->msg_rtr_nid_param = rtr_nid;
+               msg->msg_sending = 0;
+               list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+               lnet_peer_ni_decref_locked(lpni);
+               lnet_net_unlock(cpt);
+
+               CDEBUG(D_NET, "%s pending discovery\n",
+                      libcfs_nid2str(peer->lp_primary_nid));
+
+               return LNET_DC_WAIT;
        }
        lnet_peer_ni_decref_locked(lpni);
 
@@ -1596,6 +1709,7 @@ again:
                        /* Get the target peer_ni */
                        peer_net = lnet_peer_get_net_locked(peer,
                                        LNET_NIDNET(best_lpni->lpni_nid));
+                       LASSERT(peer_net != NULL);
                        list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
                                            lpni_peer_nis) {
                                if (lpni->lpni_pref_nnids == 0)
@@ -2020,14 +2134,16 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
        if (rc == LNET_CREDIT_OK)
                lnet_ni_send(msg->msg_txni, msg);
 
-       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
+       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
        return 0;
 }
 
 void
-lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob)
+lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
+                 __u32 msg_type)
 {
        lnet_net_lock(cpt);
+       lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
        the_lnet.ln_counters[cpt]->drop_count++;
        the_lnet.ln_counters[cpt]->drop_length += nob;
        lnet_net_unlock(cpt);
@@ -2463,11 +2579,12 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        for_me = (ni->ni_nid == dest_nid);
        cpt = lnet_cpt_of_nid(from_nid, ni);
 
-       CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s\n",
+       CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s - %s\n",
                libcfs_nid2str(dest_nid),
                libcfs_nid2str(ni->ni_nid),
                libcfs_nid2str(src_nid),
-               lnet_msgtyp2str(type));
+               lnet_msgtyp2str(type),
+               (for_me) ? "for me" : "routed");
 
        switch (type) {
        case LNET_MSG_ACK:
@@ -2683,7 +2800,7 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        lnet_finalize(msg, rc);
 
  drop:
-       lnet_drop_message(ni, cpt, private, payload_length);
+       lnet_drop_message(ni, cpt, private, payload_length, type);
        return 0;
 }
 EXPORT_SYMBOL(lnet_parse);
@@ -2719,7 +2836,8 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                 * until that's done */
 
                lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
-                                 msg->msg_private, msg->msg_len);
+                                 msg->msg_private, msg->msg_len,
+                                 msg->msg_type);
                /*
                 * NB: message will not generate event because w/o attached MD,
                 * but we still should give error code so lnet_msg_decommit()
@@ -2959,6 +3077,7 @@ lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg)
        cpt = lnet_cpt_of_nid(peer_id.nid, ni);
 
        lnet_net_lock(cpt);
+       lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
        the_lnet.ln_counters[cpt]->drop_count++;
        the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
        lnet_net_unlock(cpt);