Whamcloud - gitweb
LU-7734 lnet: Multi-Rail local NI split
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index d93d061..e9a63eb 100644 (file)
@@ -584,13 +584,14 @@ lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
                        iov  = msg->msg_iov;
                        kiov = msg->msg_kiov;
 
-                       LASSERT(niov > 0);
-                       LASSERT((iov == NULL) != (kiov == NULL));
+                       LASSERT (niov > 0);
+                       LASSERT ((iov == NULL) != (kiov == NULL));
                }
        }
 
-       rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
-                                   niov, iov, kiov, offset, mlen, rlen);
+       rc = (ni->ni_net->net_lnd->lnd_recv)(ni, private, msg, delayed,
+                                            niov, iov, kiov, offset, mlen,
+                                            rlen);
        if (rc < 0)
                lnet_finalize(ni, msg, rc);
 }
@@ -645,7 +646,7 @@ lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
        LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
                 (msg->msg_txcredit && msg->msg_peertxcredit));
 
-       rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
+       rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
        if (rc < 0)
                lnet_finalize(ni, msg, rc);
 }
@@ -658,11 +659,11 @@ lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
        LASSERT(!msg->msg_sending);
        LASSERT(msg->msg_receiving);
        LASSERT(!msg->msg_rx_ready_delay);
-       LASSERT(ni->ni_lnd->lnd_eager_recv != NULL);
+       LASSERT(ni->ni_net->net_lnd->lnd_eager_recv != NULL);
 
        msg->msg_rx_ready_delay = 1;
-       rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
-                                         &msg->msg_private);
+       rc = (ni->ni_net->net_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
+                                                 &msg->msg_private);
        if (rc != 0) {
                CERROR("recv from %s / send to %s aborted: "
                       "eager_recv failed %d\n",
@@ -681,10 +682,10 @@ lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
        cfs_time_t last_alive = 0;
 
        LASSERT(lnet_peer_aliveness_enabled(lp));
-       LASSERT(ni->ni_lnd->lnd_query != NULL);
+       LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
 
        lnet_net_unlock(lp->lp_cpt);
-       (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
+       (ni->ni_net->net_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
        lnet_net_lock(lp->lp_cpt);
 
        lp->lp_last_query = cfs_time_current();
@@ -697,23 +698,27 @@ lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
 static inline int
 lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
 {
-       int        alive;
+       int        alive;
        cfs_time_t deadline;
 
-       LASSERT(lnet_peer_aliveness_enabled(lp));
+       LASSERT (lnet_peer_aliveness_enabled(lp));
 
-       /* Trust lnet_notify() if it has more recent aliveness news, but
+       /*
+        * Trust lnet_notify() if it has more recent aliveness news, but
         * ignore the initial assumed death (see lnet_peers_start_down()).
         */
        if (!lp->lp_alive && lp->lp_alive_count > 0 &&
            cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
                return 0;
 
-       deadline = cfs_time_add(lp->lp_last_alive,
-                               cfs_time_seconds(lp->lp_ni->ni_peertimeout));
+       deadline =
+         cfs_time_add(lp->lp_last_alive,
+                      cfs_time_seconds(lp->lp_net->net_tunables.
+                                       lct_peer_timeout));
        alive = cfs_time_after(deadline, now);
 
-       /* Update obsolete lp_alive except for routers assumed to be dead
+       /*
+        * Update obsolete lp_alive except for routers assumed to be dead
         * initially, because router checker would update aliveness in this
         * case, and moreover lp_last_alive at peer creation is assumed.
         */
@@ -728,7 +733,7 @@ lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
 /* NB: returns 1 when alive, 0 when dead, negative when error;
  *     may drop the lnet_net_lock */
 static int
-lnet_peer_alive_locked (lnet_peer_t *lp)
+lnet_peer_alive_locked (struct lnet_ni *ni, lnet_peer_t *lp)
 {
        cfs_time_t now = cfs_time_current();
 
@@ -738,8 +743,10 @@ lnet_peer_alive_locked (lnet_peer_t *lp)
        if (lnet_peer_is_alive(lp, now))
                return 1;
 
-       /* Peer appears dead, but we should avoid frequent NI queries (at
-        * most once per lnet_queryinterval seconds). */
+       /*
+        * Peer appears dead, but we should avoid frequent NI queries (at
+        * most once per lnet_queryinterval seconds).
+        */
        if (lp->lp_last_query != 0) {
                static const int lnet_queryinterval = 1;
 
@@ -754,13 +761,13 @@ lnet_peer_alive_locked (lnet_peer_t *lp)
                                      libcfs_nid2str(lp->lp_nid),
                                      (int)now, (int)next_query,
                                      lnet_queryinterval,
-                                     lp->lp_ni->ni_peertimeout);
+                                     lp->lp_net->net_tunables.lct_peer_timeout);
                        return 0;
                }
        }
 
        /* query NI for latest aliveness news */
-       lnet_ni_query_locked(lp->lp_ni, lp);
+       lnet_ni_query_locked(ni, lp);
 
        if (lnet_peer_is_alive(lp, now))
                return 1;
@@ -784,7 +791,7 @@ static int
 lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 {
        lnet_peer_t             *lp = msg->msg_txpeer;
-       lnet_ni_t               *ni = lp->lp_ni;
+       lnet_ni_t               *ni = msg->msg_txni;
        int                     cpt = msg->msg_tx_cpt;
        struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
 
@@ -795,7 +802,7 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 
        /* NB 'lp' is always the next hop */
        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
-           lnet_peer_alive_locked(lp) == 0) {
+           lnet_peer_alive_locked(ni, lp) == 0) {
                the_lnet.ln_counters[cpt]->drop_count++;
                the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
                lnet_net_unlock(cpt);
@@ -954,7 +961,7 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
                int cpt = msg->msg_rx_cpt;
 
                lnet_net_unlock(cpt);
-               lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
+               lnet_ni_recv(msg->msg_rxni, msg->msg_private, msg, 1,
                             0, msg->msg_len, msg->msg_len);
                lnet_net_lock(cpt);
        }
@@ -966,9 +973,10 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
 {
        lnet_peer_t     *txpeer = msg->msg_txpeer;
        lnet_msg_t      *msg2;
+       struct lnet_ni  *txni = msg->msg_txni;
 
        if (msg->msg_txcredit) {
-               struct lnet_ni       *ni = txpeer->lp_ni;
+               struct lnet_ni       *ni = msg->msg_txni;
                struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
 
                /* give back NI txcredits */
@@ -983,7 +991,7 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
                                          lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
 
-                       LASSERT(msg2->msg_txpeer->lp_ni == ni);
+                       LASSERT(msg2->msg_txni == ni);
                        LASSERT(msg2->msg_tx_delayed);
 
                        (void) lnet_post_send_locked(msg2, 1);
@@ -1013,6 +1021,11 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
                }
        }
 
+       if (txni != NULL) {
+               msg->msg_txni = NULL;
+               lnet_ni_decref_locked(txni, msg->msg_tx_cpt);
+       }
+
        if (txpeer != NULL) {
                msg->msg_txpeer = NULL;
                lnet_peer_decref_locked(txpeer);
@@ -1047,7 +1060,7 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
        lnet_net_unlock(cpt);
 
        list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
-               lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
+               lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
                             0, 0, 0, msg->msg_hdr.payload_length);
                list_del_init(&msg->msg_list);
                lnet_finalize(NULL, msg, -ECANCELED);
@@ -1060,6 +1073,7 @@ void
 lnet_return_rx_credits_locked(lnet_msg_t *msg)
 {
        lnet_peer_t     *rxpeer = msg->msg_rxpeer;
+       struct lnet_ni  *rxni = msg->msg_rxni;
        lnet_msg_t      *msg2;
 
        if (msg->msg_rtrcredit) {
@@ -1129,6 +1143,10 @@ routing_off:
                        (void) lnet_post_routed_recv_locked(msg2, 1);
                }
        }
+       if (rxni != NULL) {
+               msg->msg_rxni = NULL;
+               lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
+       }
        if (rxpeer != NULL) {
                msg->msg_rxpeer = NULL;
                lnet_peer_decref_locked(rxpeer);
@@ -1174,7 +1192,8 @@ lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
 }
 
 static lnet_peer_t *
-lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
+lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
+                      lnet_nid_t rtr_nid)
 {
        lnet_remotenet_t        *rnet;
        lnet_route_t            *route;
@@ -1187,7 +1206,7 @@ lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
        /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
         * rtr_nid nid, otherwise find the best gateway I can use */
 
-       rnet = lnet_find_net_locked(LNET_NIDNET(target));
+       rnet = lnet_find_rnet_locked(LNET_NIDNET(target));
        if (rnet == NULL)
                return NULL;
 
@@ -1199,7 +1218,7 @@ lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
                if (!lnet_is_route_alive(route))
                        continue;
 
-               if (ni != NULL && lp->lp_ni != ni)
+               if (net != NULL && lp->lp_net != net)
                        continue;
 
                if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
@@ -1254,14 +1273,13 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
        msg->msg_sending = 1;
 
        LASSERT(!msg->msg_tx_committed);
-       cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
+       local_ni = lnet_net2ni(LNET_NIDNET(dst_nid));
+       cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid,
+                             local_ni);
  again:
-       lnet_net_lock(cpt);
-
-       if (the_lnet.ln_shutdown) {
-               lnet_net_unlock(cpt);
+       if (the_lnet.ln_shutdown)
                return -ESHUTDOWN;
-       }
+       lnet_net_lock(cpt);
 
        if (src_nid == LNET_NID_ANY) {
                src_ni = NULL;
@@ -1284,11 +1302,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
                if (src_ni == NULL) {
                        src_ni = local_ni;
                        src_nid = src_ni->ni_nid;
-               } else if (src_ni == local_ni) {
-                       lnet_ni_decref_locked(local_ni, cpt);
-               } else {
-                       lnet_ni_decref_locked(local_ni, cpt);
-                       lnet_ni_decref_locked(src_ni, cpt);
+               } else if (src_ni != local_ni) {
                        lnet_net_unlock(cpt);
                        LCONSOLE_WARN("No route to %s via from %s\n",
                                      libcfs_nid2str(dst_nid),
@@ -1306,16 +1320,10 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
                        /* No send credit hassles with LOLND */
                        lnet_net_unlock(cpt);
                        lnet_ni_send(src_ni, msg);
-
-                       lnet_net_lock(cpt);
-                       lnet_ni_decref_locked(src_ni, cpt);
-                       lnet_net_unlock(cpt);
                        return 0;
                }
 
                rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
-               /* lp has ref on src_ni; lose mine */
-               lnet_ni_decref_locked(src_ni, cpt);
                if (rc != 0) {
                        lnet_net_unlock(cpt);
                        LCONSOLE_WARN("Error %d finding peer %s\n", rc,
@@ -1323,13 +1331,13 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
                        /* ENOMEM or shutting down */
                        return rc;
                }
-               LASSERT(lp->lp_ni == src_ni);
+               LASSERT (lp->lp_net == src_ni->ni_net);
        } else {
                /* sending to a remote network */
-               lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
+               lp = lnet_find_route_locked(src_ni != NULL ?
+                                           src_ni->ni_net : NULL,
+                                           dst_nid, rtr_nid);
                if (lp == NULL) {
-                       if (src_ni != NULL)
-                               lnet_ni_decref_locked(src_ni, cpt);
                        lnet_net_unlock(cpt);
 
                        LCONSOLE_WARN("No route to %s via %s "
@@ -1344,10 +1352,8 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
                 * pre-determined router, this can happen if router table
                 * was changed when we release the lock */
                if (rtr_nid != lp->lp_nid) {
-                       cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
+                       cpt2 = lp->lp_cpt;
                        if (cpt2 != cpt) {
-                               if (src_ni != NULL)
-                                       lnet_ni_decref_locked(src_ni, cpt);
                                lnet_net_unlock(cpt);
 
                                rtr_nid = lp->lp_nid;
@@ -1361,11 +1367,11 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
                       lnet_msgtyp2str(msg->msg_type), msg->msg_len);
 
                if (src_ni == NULL) {
-                       src_ni = lp->lp_ni;
+                       src_ni = lnet_get_next_ni_locked(lp->lp_net, NULL);
+                       LASSERT(src_ni != NULL);
                        src_nid = src_ni->ni_nid;
                } else {
-                       LASSERT(src_ni == lp->lp_ni);
-                       lnet_ni_decref_locked(src_ni, cpt);
+                       LASSERT (src_ni->ni_net == lp->lp_net);
                }
 
                lnet_peer_addref_locked(lp);
@@ -1389,7 +1395,10 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
        LASSERT(!msg->msg_txcredit);
        LASSERT(msg->msg_txpeer == NULL);
 
-       msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
+       msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
+       /* set the NI for this message */
+       msg->msg_txni = src_ni;
+       lnet_ni_addref_locked(msg->msg_txni, cpt);
 
        rc = lnet_post_send_locked(msg, 0);
        lnet_net_unlock(cpt);
@@ -1453,8 +1462,9 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
        info.mi_rlength = hdr->payload_length;
        info.mi_roffset = hdr->msg.put.offset;
        info.mi_mbits   = hdr->msg.put.match_bits;
+       info.mi_cpt     = msg->msg_rxpeer->lp_cpt;
 
-       msg->msg_rx_ready_delay = ni->ni_lnd->lnd_eager_recv == NULL;
+       msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
        ready_delay = msg->msg_rx_ready_delay;
 
  again:
@@ -1687,7 +1697,7 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
 
        if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
-               if (ni->ni_lnd->lnd_eager_recv == NULL) {
+               if (ni->ni_net->net_lnd->lnd_eager_recv == NULL) {
                        msg->msg_rx_ready_delay = 1;
                } else {
                        lnet_net_unlock(msg->msg_rx_cpt);
@@ -1832,7 +1842,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
        payload_length = le32_to_cpu(hdr->payload_length);
 
        for_me = (ni->ni_nid == dest_nid);
-       cpt = lnet_cpt_of_nid(from_nid);
+       cpt = lnet_cpt_of_nid(from_nid, ni);
 
        switch (type) {
        case LNET_MSG_ACK:
@@ -1994,6 +2004,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                        return 0;
                goto drop;
        }
+       msg->msg_rxni = ni;
+       lnet_ni_addref_locked(ni, cpt);
 
        if (lnet_isrouter(msg->msg_rxpeer)) {
                lnet_peer_set_alive(msg->msg_rxpeer);
@@ -2078,7 +2090,7 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                 * called lnet_drop_message(), so I just hang onto msg as well
                 * until that's done */
 
-               lnet_drop_message(msg->msg_rxpeer->lp_ni,
+               lnet_drop_message(msg->msg_rxni,
                                  msg->msg_rxpeer->lp_cpt,
                                  msg->msg_private, msg->msg_len);
                /*
@@ -2086,7 +2098,7 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                 * but we still should give error code so lnet_msg_decommit()
                 * can skip counters operations and other checks.
                 */
-               lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
+               lnet_finalize(msg->msg_rxni, msg, -ENOENT);
        }
 }
 
@@ -2109,6 +2121,7 @@ lnet_recv_delayed_msg_list(struct list_head *head)
                LASSERT(msg->msg_rx_delayed);
                LASSERT(msg->msg_md != NULL);
                LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_rxni != NULL);
                LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
 
                CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
@@ -2118,7 +2131,7 @@ lnet_recv_delayed_msg_list(struct list_head *head)
                        msg->msg_hdr.msg.put.offset,
                        msg->msg_hdr.payload_length);
 
-               lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
+               lnet_recv_put(msg->msg_rxni, msg);
        }
 }
 
@@ -2303,7 +2316,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
        lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
        lnet_res_unlock(cpt);
 
-       cpt = lnet_cpt_of_nid(peer_id.nid);
+       cpt = lnet_cpt_of_nid(peer_id.nid, ni);
 
        lnet_net_lock(cpt);
        lnet_msg_commit(msg, cpt);
@@ -2314,7 +2327,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
        return msg;
 
  drop:
-       cpt = lnet_cpt_of_nid(peer_id.nid);
+       cpt = lnet_cpt_of_nid(peer_id.nid, ni);
 
        lnet_net_lock(cpt);
        the_lnet.ln_counters[cpt]->drop_count++;
@@ -2461,7 +2474,7 @@ int
 LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
 {
        struct list_head        *e;
-       struct lnet_ni          *ni;
+       struct lnet_ni          *ni = NULL;
        lnet_remotenet_t        *rnet;
        __u32                   dstnet = LNET_NIDNET(dstnid);
        int                     hops;
@@ -2478,9 +2491,7 @@ LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
 
        cpt = lnet_net_lock_current();
 
-       list_for_each(e, &the_lnet.ln_nis) {
-               ni = list_entry(e, lnet_ni_t, ni_list);
-
+       while ((ni = lnet_get_next_ni_locked(NULL, ni))) {
                if (ni->ni_nid == dstnid) {
                        if (srcnidp != NULL)
                                *srcnidp = dstnid;
@@ -2540,8 +2551,12 @@ LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
 
                        LASSERT(shortest != NULL);
                        hops = shortest_hops;
-                       if (srcnidp != NULL)
-                               *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
+                       if (srcnidp != NULL) {
+                               ni = lnet_get_next_ni_locked(
+                                       shortest->lr_gateway->lp_net,
+                                       NULL);
+                               *srcnidp = ni->ni_nid;
+                       }
                        if (orderp != NULL)
                                *orderp = order;
                        lnet_net_unlock(cpt);