Whamcloud - gitweb
21c13c9ad929cb3d12287619f773c5cde752409e
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/lib-move.c
33  *
34  * Data movement routines
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <linux/pagemap.h>
40
41 #include <lnet/lib-lnet.h>
42 #include <linux/nsproxy.h>
43 #include <net/net_namespace.h>
44
45 extern unsigned int lnet_current_net_count;
46
47 static int local_nid_dist_zero = 1;
48 module_param(local_nid_dist_zero, int, 0444);
49 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
50
51 struct lnet_send_data {
52         struct lnet_ni *sd_best_ni;
53         struct lnet_peer_ni *sd_best_lpni;
54         struct lnet_peer_ni *sd_final_dst_lpni;
55         struct lnet_peer *sd_peer;
56         struct lnet_peer *sd_gw_peer;
57         struct lnet_peer_ni *sd_gw_lpni;
58         struct lnet_peer_net *sd_peer_net;
59         struct lnet_msg *sd_msg;
60         lnet_nid_t sd_dst_nid;
61         lnet_nid_t sd_src_nid;
62         lnet_nid_t sd_rtr_nid;
63         int sd_cpt;
64         int sd_md_cpt;
65         __u32 sd_send_case;
66 };
67
68 static inline struct lnet_comm_count *
69 get_stats_counts(struct lnet_element_stats *stats,
70                  enum lnet_stats_type stats_type)
71 {
72         switch (stats_type) {
73         case LNET_STATS_TYPE_SEND:
74                 return &stats->el_send_stats;
75         case LNET_STATS_TYPE_RECV:
76                 return &stats->el_recv_stats;
77         case LNET_STATS_TYPE_DROP:
78                 return &stats->el_drop_stats;
79         default:
80                 CERROR("Unknown stats type\n");
81         }
82
83         return NULL;
84 }
85
86 void lnet_incr_stats(struct lnet_element_stats *stats,
87                      enum lnet_msg_type msg_type,
88                      enum lnet_stats_type stats_type)
89 {
90         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
91         if (!counts)
92                 return;
93
94         switch (msg_type) {
95         case LNET_MSG_ACK:
96                 atomic_inc(&counts->co_ack_count);
97                 break;
98         case LNET_MSG_PUT:
99                 atomic_inc(&counts->co_put_count);
100                 break;
101         case LNET_MSG_GET:
102                 atomic_inc(&counts->co_get_count);
103                 break;
104         case LNET_MSG_REPLY:
105                 atomic_inc(&counts->co_reply_count);
106                 break;
107         case LNET_MSG_HELLO:
108                 atomic_inc(&counts->co_hello_count);
109                 break;
110         default:
111                 CERROR("There is a BUG in the code. Unknown message type\n");
112                 break;
113         }
114 }
115
116 __u32 lnet_sum_stats(struct lnet_element_stats *stats,
117                      enum lnet_stats_type stats_type)
118 {
119         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
120         if (!counts)
121                 return 0;
122
123         return (atomic_read(&counts->co_ack_count) +
124                 atomic_read(&counts->co_put_count) +
125                 atomic_read(&counts->co_get_count) +
126                 atomic_read(&counts->co_reply_count) +
127                 atomic_read(&counts->co_hello_count));
128 }
129
130 static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats,
131                                 struct lnet_comm_count *counts)
132 {
133         msg_stats->ico_get_count = atomic_read(&counts->co_get_count);
134         msg_stats->ico_put_count = atomic_read(&counts->co_put_count);
135         msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count);
136         msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count);
137         msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count);
138 }
139
140 void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
141                               struct lnet_element_stats *stats)
142 {
143         struct lnet_comm_count *counts;
144
145         LASSERT(msg_stats);
146         LASSERT(stats);
147
148         counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND);
149         if (!counts)
150                 return;
151         assign_stats(&msg_stats->im_send_stats, counts);
152
153         counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV);
154         if (!counts)
155                 return;
156         assign_stats(&msg_stats->im_recv_stats, counts);
157
158         counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP);
159         if (!counts)
160                 return;
161         assign_stats(&msg_stats->im_drop_stats, counts);
162 }
163
164 int
165 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
166 {
167         struct lnet_test_peer *tp;
168         struct list_head *el;
169         struct list_head *next;
170         struct list_head  cull;
171
172         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
173         if (threshold != 0) {
174                 /* Adding a new entry */
175                 LIBCFS_ALLOC(tp, sizeof(*tp));
176                 if (tp == NULL)
177                         return -ENOMEM;
178
179                 tp->tp_nid = nid;
180                 tp->tp_threshold = threshold;
181
182                 lnet_net_lock(0);
183                 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
184                 lnet_net_unlock(0);
185                 return 0;
186         }
187
188         /* removing entries */
189         INIT_LIST_HEAD(&cull);
190
191         lnet_net_lock(0);
192
193         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
194                 tp = list_entry(el, struct lnet_test_peer, tp_list);
195
196                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
197                     nid == LNET_NID_ANY ||      /* removing all entries */
198                     tp->tp_nid == nid) {        /* matched this one */
199                         list_del(&tp->tp_list);
200                         list_add(&tp->tp_list, &cull);
201                 }
202         }
203
204         lnet_net_unlock(0);
205
206         while (!list_empty(&cull)) {
207                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
208
209                 list_del(&tp->tp_list);
210                 LIBCFS_FREE(tp, sizeof(*tp));
211         }
212         return 0;
213 }
214
215 static int
216 fail_peer (lnet_nid_t nid, int outgoing)
217 {
218         struct lnet_test_peer *tp;
219         struct list_head *el;
220         struct list_head *next;
221         struct list_head  cull;
222         int               fail = 0;
223
224         INIT_LIST_HEAD(&cull);
225
226         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
227         lnet_net_lock(0);
228
229         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
230                 tp = list_entry(el, struct lnet_test_peer, tp_list);
231
232                 if (tp->tp_threshold == 0) {
233                         /* zombie entry */
234                         if (outgoing) {
235                                 /* only cull zombies on outgoing tests,
236                                  * since we may be at interrupt priority on
237                                  * incoming messages. */
238                                 list_del(&tp->tp_list);
239                                 list_add(&tp->tp_list, &cull);
240                         }
241                         continue;
242                 }
243
244                 if (tp->tp_nid == LNET_NID_ANY ||       /* fail every peer */
245                     nid == tp->tp_nid) {                /* fail this peer */
246                         fail = 1;
247
248                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
249                                 tp->tp_threshold--;
250                                 if (outgoing &&
251                                     tp->tp_threshold == 0) {
252                                         /* see above */
253                                         list_del(&tp->tp_list);
254                                         list_add(&tp->tp_list, &cull);
255                                 }
256                         }
257                         break;
258                 }
259         }
260
261         lnet_net_unlock(0);
262
263         while (!list_empty(&cull)) {
264                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
265                 list_del(&tp->tp_list);
266
267                 LIBCFS_FREE(tp, sizeof(*tp));
268         }
269
270         return fail;
271 }
272
273 unsigned int
274 lnet_iov_nob(unsigned int niov, struct kvec *iov)
275 {
276         unsigned int nob = 0;
277
278         LASSERT(niov == 0 || iov != NULL);
279         while (niov-- > 0)
280                 nob += (iov++)->iov_len;
281
282         return (nob);
283 }
284 EXPORT_SYMBOL(lnet_iov_nob);
285
286 void
287 lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
288                   unsigned int nsiov, struct kvec *siov, unsigned int soffset,
289                   unsigned int nob)
290 {
291         /* NB diov, siov are READ-ONLY */
292         unsigned int  this_nob;
293
294         if (nob == 0)
295                 return;
296
297         /* skip complete frags before 'doffset' */
298         LASSERT(ndiov > 0);
299         while (doffset >= diov->iov_len) {
300                 doffset -= diov->iov_len;
301                 diov++;
302                 ndiov--;
303                 LASSERT(ndiov > 0);
304         }
305
306         /* skip complete frags before 'soffset' */
307         LASSERT(nsiov > 0);
308         while (soffset >= siov->iov_len) {
309                 soffset -= siov->iov_len;
310                 siov++;
311                 nsiov--;
312                 LASSERT(nsiov > 0);
313         }
314
315         do {
316                 LASSERT(ndiov > 0);
317                 LASSERT(nsiov > 0);
318                 this_nob = MIN(diov->iov_len - doffset,
319                                siov->iov_len - soffset);
320                 this_nob = MIN(this_nob, nob);
321
322                 memcpy((char *)diov->iov_base + doffset,
323                        (char *)siov->iov_base + soffset, this_nob);
324                 nob -= this_nob;
325
326                 if (diov->iov_len > doffset + this_nob) {
327                         doffset += this_nob;
328                 } else {
329                         diov++;
330                         ndiov--;
331                         doffset = 0;
332                 }
333
334                 if (siov->iov_len > soffset + this_nob) {
335                         soffset += this_nob;
336                 } else {
337                         siov++;
338                         nsiov--;
339                         soffset = 0;
340                 }
341         } while (nob > 0);
342 }
343 EXPORT_SYMBOL(lnet_copy_iov2iov);
344
345 int
346 lnet_extract_iov(int dst_niov, struct kvec *dst,
347                  int src_niov, struct kvec *src,
348                  unsigned int offset, unsigned int len)
349 {
350         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
351          * for exactly 'len' bytes, and return the number of entries.
352          * NB not destructive to 'src' */
353         unsigned int    frag_len;
354         unsigned int    niov;
355
356         if (len == 0)                           /* no data => */
357                 return (0);                     /* no frags */
358
359         LASSERT(src_niov > 0);
360         while (offset >= src->iov_len) {      /* skip initial frags */
361                 offset -= src->iov_len;
362                 src_niov--;
363                 src++;
364                 LASSERT(src_niov > 0);
365         }
366
367         niov = 1;
368         for (;;) {
369                 LASSERT(src_niov > 0);
370                 LASSERT((int)niov <= dst_niov);
371
372                 frag_len = src->iov_len - offset;
373                 dst->iov_base = ((char *)src->iov_base) + offset;
374
375                 if (len <= frag_len) {
376                         dst->iov_len = len;
377                         return (niov);
378                 }
379
380                 dst->iov_len = frag_len;
381
382                 len -= frag_len;
383                 dst++;
384                 src++;
385                 niov++;
386                 src_niov--;
387                 offset = 0;
388         }
389 }
390 EXPORT_SYMBOL(lnet_extract_iov);
391
392
393 unsigned int
394 lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
395 {
396         unsigned int  nob = 0;
397
398         LASSERT(niov == 0 || kiov != NULL);
399         while (niov-- > 0)
400                 nob += (kiov++)->kiov_len;
401
402         return (nob);
403 }
404 EXPORT_SYMBOL(lnet_kiov_nob);
405
406 void
407 lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
408                     unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
409                     unsigned int nob)
410 {
411         /* NB diov, siov are READ-ONLY */
412         unsigned int    this_nob;
413         char           *daddr = NULL;
414         char           *saddr = NULL;
415
416         if (nob == 0)
417                 return;
418
419         LASSERT (!in_interrupt ());
420
421         LASSERT (ndiov > 0);
422         while (doffset >= diov->kiov_len) {
423                 doffset -= diov->kiov_len;
424                 diov++;
425                 ndiov--;
426                 LASSERT(ndiov > 0);
427         }
428
429         LASSERT(nsiov > 0);
430         while (soffset >= siov->kiov_len) {
431                 soffset -= siov->kiov_len;
432                 siov++;
433                 nsiov--;
434                 LASSERT(nsiov > 0);
435         }
436
437         do {
438                 LASSERT(ndiov > 0);
439                 LASSERT(nsiov > 0);
440                 this_nob = MIN(diov->kiov_len - doffset,
441                                siov->kiov_len - soffset);
442                 this_nob = MIN(this_nob, nob);
443
444                 if (daddr == NULL)
445                         daddr = ((char *)kmap(diov->kiov_page)) +
446                                 diov->kiov_offset + doffset;
447                 if (saddr == NULL)
448                         saddr = ((char *)kmap(siov->kiov_page)) +
449                                 siov->kiov_offset + soffset;
450
451                 /* Vanishing risk of kmap deadlock when mapping 2 pages.
452                  * However in practice at least one of the kiovs will be mapped
453                  * kernel pages and the map/unmap will be NOOPs */
454
455                 memcpy (daddr, saddr, this_nob);
456                 nob -= this_nob;
457
458                 if (diov->kiov_len > doffset + this_nob) {
459                         daddr += this_nob;
460                         doffset += this_nob;
461                 } else {
462                         kunmap(diov->kiov_page);
463                         daddr = NULL;
464                         diov++;
465                         ndiov--;
466                         doffset = 0;
467                 }
468
469                 if (siov->kiov_len > soffset + this_nob) {
470                         saddr += this_nob;
471                         soffset += this_nob;
472                 } else {
473                         kunmap(siov->kiov_page);
474                         saddr = NULL;
475                         siov++;
476                         nsiov--;
477                         soffset = 0;
478                 }
479         } while (nob > 0);
480
481         if (daddr != NULL)
482                 kunmap(diov->kiov_page);
483         if (saddr != NULL)
484                 kunmap(siov->kiov_page);
485 }
486 EXPORT_SYMBOL(lnet_copy_kiov2kiov);
487
488 void
489 lnet_copy_kiov2iov (unsigned int niov, struct kvec *iov, unsigned int iovoffset,
490                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
491                     unsigned int nob)
492 {
493         /* NB iov, kiov are READ-ONLY */
494         unsigned int    this_nob;
495         char           *addr = NULL;
496
497         if (nob == 0)
498                 return;
499
500         LASSERT (!in_interrupt ());
501
502         LASSERT (niov > 0);
503         while (iovoffset >= iov->iov_len) {
504                 iovoffset -= iov->iov_len;
505                 iov++;
506                 niov--;
507                 LASSERT(niov > 0);
508         }
509
510         LASSERT(nkiov > 0);
511         while (kiovoffset >= kiov->kiov_len) {
512                 kiovoffset -= kiov->kiov_len;
513                 kiov++;
514                 nkiov--;
515                 LASSERT(nkiov > 0);
516         }
517
518         do {
519                 LASSERT(niov > 0);
520                 LASSERT(nkiov > 0);
521                 this_nob = MIN(iov->iov_len - iovoffset,
522                                kiov->kiov_len - kiovoffset);
523                 this_nob = MIN(this_nob, nob);
524
525                 if (addr == NULL)
526                         addr = ((char *)kmap(kiov->kiov_page)) +
527                                 kiov->kiov_offset + kiovoffset;
528
529                 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
530                 nob -= this_nob;
531
532                 if (iov->iov_len > iovoffset + this_nob) {
533                         iovoffset += this_nob;
534                 } else {
535                         iov++;
536                         niov--;
537                         iovoffset = 0;
538                 }
539
540                 if (kiov->kiov_len > kiovoffset + this_nob) {
541                         addr += this_nob;
542                         kiovoffset += this_nob;
543                 } else {
544                         kunmap(kiov->kiov_page);
545                         addr = NULL;
546                         kiov++;
547                         nkiov--;
548                         kiovoffset = 0;
549                 }
550
551         } while (nob > 0);
552
553         if (addr != NULL)
554                 kunmap(kiov->kiov_page);
555 }
556 EXPORT_SYMBOL(lnet_copy_kiov2iov);
557
558 void
559 lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
560                    unsigned int niov, struct kvec *iov, unsigned int iovoffset,
561                    unsigned int nob)
562 {
563         /* NB kiov, iov are READ-ONLY */
564         unsigned int    this_nob;
565         char           *addr = NULL;
566
567         if (nob == 0)
568                 return;
569
570         LASSERT (!in_interrupt ());
571
572         LASSERT (nkiov > 0);
573         while (kiovoffset >= kiov->kiov_len) {
574                 kiovoffset -= kiov->kiov_len;
575                 kiov++;
576                 nkiov--;
577                 LASSERT(nkiov > 0);
578         }
579
580         LASSERT(niov > 0);
581         while (iovoffset >= iov->iov_len) {
582                 iovoffset -= iov->iov_len;
583                 iov++;
584                 niov--;
585                 LASSERT(niov > 0);
586         }
587
588         do {
589                 LASSERT(nkiov > 0);
590                 LASSERT(niov > 0);
591                 this_nob = MIN(kiov->kiov_len - kiovoffset,
592                                iov->iov_len - iovoffset);
593                 this_nob = MIN(this_nob, nob);
594
595                 if (addr == NULL)
596                         addr = ((char *)kmap(kiov->kiov_page)) +
597                                 kiov->kiov_offset + kiovoffset;
598
599                 memcpy (addr, (char *)iov->iov_base + iovoffset, this_nob);
600                 nob -= this_nob;
601
602                 if (kiov->kiov_len > kiovoffset + this_nob) {
603                         addr += this_nob;
604                         kiovoffset += this_nob;
605                 } else {
606                         kunmap(kiov->kiov_page);
607                         addr = NULL;
608                         kiov++;
609                         nkiov--;
610                         kiovoffset = 0;
611                 }
612
613                 if (iov->iov_len > iovoffset + this_nob) {
614                         iovoffset += this_nob;
615                 } else {
616                         iov++;
617                         niov--;
618                         iovoffset = 0;
619                 }
620         } while (nob > 0);
621
622         if (addr != NULL)
623                 kunmap(kiov->kiov_page);
624 }
625 EXPORT_SYMBOL(lnet_copy_iov2kiov);
626
627 int
628 lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
629                   int src_niov, lnet_kiov_t *src,
630                   unsigned int offset, unsigned int len)
631 {
632         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
633          * for exactly 'len' bytes, and return the number of entries.
634          * NB not destructive to 'src' */
635         unsigned int    frag_len;
636         unsigned int    niov;
637
638         if (len == 0)                           /* no data => */
639                 return (0);                     /* no frags */
640
641         LASSERT(src_niov > 0);
642         while (offset >= src->kiov_len) {      /* skip initial frags */
643                 offset -= src->kiov_len;
644                 src_niov--;
645                 src++;
646                 LASSERT(src_niov > 0);
647         }
648
649         niov = 1;
650         for (;;) {
651                 LASSERT(src_niov > 0);
652                 LASSERT((int)niov <= dst_niov);
653
654                 frag_len = src->kiov_len - offset;
655                 dst->kiov_page = src->kiov_page;
656                 dst->kiov_offset = src->kiov_offset + offset;
657
658                 if (len <= frag_len) {
659                         dst->kiov_len = len;
660                         LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
661                         return niov;
662                 }
663
664                 dst->kiov_len = frag_len;
665                 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
666
667                 len -= frag_len;
668                 dst++;
669                 src++;
670                 niov++;
671                 src_niov--;
672                 offset = 0;
673         }
674 }
675 EXPORT_SYMBOL(lnet_extract_kiov);
676
677 void
678 lnet_ni_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
679              int delayed, unsigned int offset, unsigned int mlen,
680              unsigned int rlen)
681 {
682         unsigned int  niov = 0;
683         struct kvec *iov = NULL;
684         lnet_kiov_t  *kiov = NULL;
685         int           rc;
686
687         LASSERT (!in_interrupt ());
688         LASSERT (mlen == 0 || msg != NULL);
689
690         if (msg != NULL) {
691                 LASSERT(msg->msg_receiving);
692                 LASSERT(!msg->msg_sending);
693                 LASSERT(rlen == msg->msg_len);
694                 LASSERT(mlen <= msg->msg_len);
695                 LASSERT(msg->msg_offset == offset);
696                 LASSERT(msg->msg_wanted == mlen);
697
698                 msg->msg_receiving = 0;
699
700                 if (mlen != 0) {
701                         niov = msg->msg_niov;
702                         iov  = msg->msg_iov;
703                         kiov = msg->msg_kiov;
704
705                         LASSERT (niov > 0);
706                         LASSERT ((iov == NULL) != (kiov == NULL));
707                 }
708         }
709
710         rc = (ni->ni_net->net_lnd->lnd_recv)(ni, private, msg, delayed,
711                                              niov, iov, kiov, offset, mlen,
712                                              rlen);
713         if (rc < 0)
714                 lnet_finalize(msg, rc);
715 }
716
717 static void
718 lnet_setpayloadbuffer(struct lnet_msg *msg)
719 {
720         struct lnet_libmd *md = msg->msg_md;
721
722         LASSERT(msg->msg_len > 0);
723         LASSERT(!msg->msg_routing);
724         LASSERT(md != NULL);
725         LASSERT(msg->msg_niov == 0);
726         LASSERT(msg->msg_iov == NULL);
727         LASSERT(msg->msg_kiov == NULL);
728
729         msg->msg_niov = md->md_niov;
730         if ((md->md_options & LNET_MD_KIOV) != 0)
731                 msg->msg_kiov = md->md_iov.kiov;
732         else
733                 msg->msg_iov = md->md_iov.iov;
734 }
735
736 void
737 lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
738                unsigned int offset, unsigned int len)
739 {
740         msg->msg_type = type;
741         msg->msg_target = target;
742         msg->msg_len = len;
743         msg->msg_offset = offset;
744
745         if (len != 0)
746                 lnet_setpayloadbuffer(msg);
747
748         memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
749         msg->msg_hdr.type           = cpu_to_le32(type);
750         /* dest_nid will be overwritten by lnet_select_pathway() */
751         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
752         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
753         /* src_nid will be set later */
754         msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
755         msg->msg_hdr.payload_length = cpu_to_le32(len);
756 }
757
758 static void
759 lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg)
760 {
761         void   *priv = msg->msg_private;
762         int rc;
763
764         LASSERT (!in_interrupt ());
765         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
766                  (msg->msg_txcredit && msg->msg_peertxcredit));
767
768         rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
769         if (rc < 0) {
770                 msg->msg_no_resend = true;
771                 lnet_finalize(msg, rc);
772         }
773 }
774
775 static int
776 lnet_ni_eager_recv(struct lnet_ni *ni, struct lnet_msg *msg)
777 {
778         int     rc;
779
780         LASSERT(!msg->msg_sending);
781         LASSERT(msg->msg_receiving);
782         LASSERT(!msg->msg_rx_ready_delay);
783         LASSERT(ni->ni_net->net_lnd->lnd_eager_recv != NULL);
784
785         msg->msg_rx_ready_delay = 1;
786         rc = (ni->ni_net->net_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
787                                                   &msg->msg_private);
788         if (rc != 0) {
789                 CERROR("recv from %s / send to %s aborted: "
790                        "eager_recv failed %d\n",
791                        libcfs_nid2str(msg->msg_rxpeer->lpni_nid),
792                        libcfs_id2str(msg->msg_target), rc);
793                 LASSERT(rc < 0); /* required by my callers */
794         }
795
796         return rc;
797 }
798
799 /* NB: returns 1 when alive, 0 when dead, negative when error;
800  *     may drop the lnet_net_lock */
801 static int
802 lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
803                        struct lnet_msg *msg)
804 {
805         if (!lnet_peer_aliveness_enabled(lpni))
806                 return -ENODEV;
807
808         /*
809          * If we're resending a message, let's attempt to send it even if
810          * the peer is down to fulfill our resend quota on the message
811          */
812         if (msg->msg_retry_count > 0)
813                 return 1;
814
815         /* try and send recovery messages irregardless */
816         if (msg->msg_recovery)
817                 return 1;
818
819         /* always send any responses */
820         if (msg->msg_type == LNET_MSG_ACK ||
821             msg->msg_type == LNET_MSG_REPLY)
822                 return 1;
823
824         return lnet_is_peer_ni_alive(lpni);
825 }
826
827 /**
828  * \param msg The message to be sent.
829  * \param do_send True if lnet_ni_send() should be called in this function.
830  *        lnet_send() is going to lnet_net_unlock immediately after this, so
831  *        it sets do_send FALSE and I don't do the unlock/send/lock bit.
832  *
833  * \retval LNET_CREDIT_OK If \a msg sent or OK to send.
834  * \retval LNET_CREDIT_WAIT If \a msg blocked for credit.
835  * \retval -EHOSTUNREACH If the next hop of the message appears dead.
836  * \retval -ECANCELED If the MD of the message has been unlinked.
837  */
838 static int
839 lnet_post_send_locked(struct lnet_msg *msg, int do_send)
840 {
841         struct lnet_peer_ni     *lp = msg->msg_txpeer;
842         struct lnet_ni          *ni = msg->msg_txni;
843         int                     cpt = msg->msg_tx_cpt;
844         struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
845
846         /* non-lnet_send() callers have checked before */
847         LASSERT(!do_send || msg->msg_tx_delayed);
848         LASSERT(!msg->msg_receiving);
849         LASSERT(msg->msg_tx_committed);
850         /* can't get here if we're sending to the loopback interface */
851         LASSERT(lp->lpni_nid != the_lnet.ln_loni->ni_nid);
852
853         /* NB 'lp' is always the next hop */
854         if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
855             lnet_peer_alive_locked(ni, lp, msg) == 0) {
856                 the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
857                 the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
858                         msg->msg_len;
859                 lnet_net_unlock(cpt);
860                 if (msg->msg_txpeer)
861                         lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
862                                         msg->msg_type,
863                                         LNET_STATS_TYPE_DROP);
864                 if (msg->msg_txni)
865                         lnet_incr_stats(&msg->msg_txni->ni_stats,
866                                         msg->msg_type,
867                                         LNET_STATS_TYPE_DROP);
868
869                 CNETERR("Dropping message for %s: peer not alive\n",
870                         libcfs_id2str(msg->msg_target));
871                 msg->msg_health_status = LNET_MSG_STATUS_REMOTE_DROPPED;
872                 if (do_send)
873                         lnet_finalize(msg, -EHOSTUNREACH);
874
875                 lnet_net_lock(cpt);
876                 return -EHOSTUNREACH;
877         }
878
879         if (msg->msg_md != NULL &&
880             (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
881                 lnet_net_unlock(cpt);
882
883                 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
884                         "called on the MD/ME.\n",
885                         libcfs_id2str(msg->msg_target));
886                 if (do_send) {
887                         msg->msg_no_resend = true;
888                         CDEBUG(D_NET, "msg %p to %s canceled and will not be resent\n",
889                                msg, libcfs_id2str(msg->msg_target));
890                         lnet_finalize(msg, -ECANCELED);
891                 }
892
893                 lnet_net_lock(cpt);
894                 return -ECANCELED;
895         }
896
897         if (!msg->msg_peertxcredit) {
898                 spin_lock(&lp->lpni_lock);
899                 LASSERT((lp->lpni_txcredits < 0) ==
900                         !list_empty(&lp->lpni_txq));
901
902                 msg->msg_peertxcredit = 1;
903                 lp->lpni_txqnob += msg->msg_len + sizeof(struct lnet_hdr);
904                 lp->lpni_txcredits--;
905
906                 if (lp->lpni_txcredits < lp->lpni_mintxcredits)
907                         lp->lpni_mintxcredits = lp->lpni_txcredits;
908
909                 if (lp->lpni_txcredits < 0) {
910                         msg->msg_tx_delayed = 1;
911                         list_add_tail(&msg->msg_list, &lp->lpni_txq);
912                         spin_unlock(&lp->lpni_lock);
913                         return LNET_CREDIT_WAIT;
914                 }
915                 spin_unlock(&lp->lpni_lock);
916         }
917
918         if (!msg->msg_txcredit) {
919                 LASSERT((tq->tq_credits < 0) ==
920                         !list_empty(&tq->tq_delayed));
921
922                 msg->msg_txcredit = 1;
923                 tq->tq_credits--;
924                 atomic_dec(&ni->ni_tx_credits);
925
926                 if (tq->tq_credits < tq->tq_credits_min)
927                         tq->tq_credits_min = tq->tq_credits;
928
929                 if (tq->tq_credits < 0) {
930                         msg->msg_tx_delayed = 1;
931                         list_add_tail(&msg->msg_list, &tq->tq_delayed);
932                         return LNET_CREDIT_WAIT;
933                 }
934         }
935
936         /* unset the tx_delay flag as we're going to send it now */
937         msg->msg_tx_delayed = 0;
938
939         if (do_send) {
940                 lnet_net_unlock(cpt);
941                 lnet_ni_send(ni, msg);
942                 lnet_net_lock(cpt);
943         }
944         return LNET_CREDIT_OK;
945 }
946
947
948 static struct lnet_rtrbufpool *
949 lnet_msg2bufpool(struct lnet_msg *msg)
950 {
951         struct lnet_rtrbufpool  *rbp;
952         int                     cpt;
953
954         LASSERT(msg->msg_rx_committed);
955
956         cpt = msg->msg_rx_cpt;
957         rbp = &the_lnet.ln_rtrpools[cpt][0];
958
959         LASSERT(msg->msg_len <= LNET_MTU);
960         while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_SIZE) {
961                 rbp++;
962                 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
963         }
964
965         return rbp;
966 }
967
968 static int
969 lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
970 {
971         /* lnet_parse is going to lnet_net_unlock immediately after this, so it
972          * sets do_recv FALSE and I don't do the unlock/send/lock bit.
973          * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
974          * received or OK to receive */
975         struct lnet_peer_ni *lpni = msg->msg_rxpeer;
976         struct lnet_peer *lp;
977         struct lnet_rtrbufpool *rbp;
978         struct lnet_rtrbuf *rb;
979
980         LASSERT(msg->msg_iov == NULL);
981         LASSERT(msg->msg_kiov == NULL);
982         LASSERT(msg->msg_niov == 0);
983         LASSERT(msg->msg_routing);
984         LASSERT(msg->msg_receiving);
985         LASSERT(!msg->msg_sending);
986         LASSERT(lpni->lpni_peer_net);
987         LASSERT(lpni->lpni_peer_net->lpn_peer);
988
989         lp = lpni->lpni_peer_net->lpn_peer;
990
991         /* non-lnet_parse callers only receive delayed messages */
992         LASSERT(!do_recv || msg->msg_rx_delayed);
993
994         if (!msg->msg_peerrtrcredit) {
995                 /* lpni_lock protects the credit manipulation */
996                 spin_lock(&lpni->lpni_lock);
997                 /* lp_lock protects the lp_rtrq */
998                 spin_lock(&lp->lp_lock);
999
1000                 msg->msg_peerrtrcredit = 1;
1001                 lpni->lpni_rtrcredits--;
1002                 if (lpni->lpni_rtrcredits < lpni->lpni_minrtrcredits)
1003                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
1004
1005                 if (lpni->lpni_rtrcredits < 0) {
1006                         /* must have checked eager_recv before here */
1007                         LASSERT(msg->msg_rx_ready_delay);
1008                         msg->msg_rx_delayed = 1;
1009                         list_add_tail(&msg->msg_list, &lp->lp_rtrq);
1010                         spin_unlock(&lp->lp_lock);
1011                         spin_unlock(&lpni->lpni_lock);
1012                         return LNET_CREDIT_WAIT;
1013                 }
1014                 spin_unlock(&lp->lp_lock);
1015                 spin_unlock(&lpni->lpni_lock);
1016         }
1017
1018         rbp = lnet_msg2bufpool(msg);
1019
1020         if (!msg->msg_rtrcredit) {
1021                 msg->msg_rtrcredit = 1;
1022                 rbp->rbp_credits--;
1023                 if (rbp->rbp_credits < rbp->rbp_mincredits)
1024                         rbp->rbp_mincredits = rbp->rbp_credits;
1025
1026                 if (rbp->rbp_credits < 0) {
1027                         /* must have checked eager_recv before here */
1028                         LASSERT(msg->msg_rx_ready_delay);
1029                         msg->msg_rx_delayed = 1;
1030                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
1031                         return LNET_CREDIT_WAIT;
1032                 }
1033         }
1034
1035         LASSERT(!list_empty(&rbp->rbp_bufs));
1036         rb = list_entry(rbp->rbp_bufs.next, struct lnet_rtrbuf, rb_list);
1037         list_del(&rb->rb_list);
1038
1039         msg->msg_niov = rbp->rbp_npages;
1040         msg->msg_kiov = &rb->rb_kiov[0];
1041
1042         /* unset the msg-rx_delayed flag since we're receiving the message */
1043         msg->msg_rx_delayed = 0;
1044
1045         if (do_recv) {
1046                 int cpt = msg->msg_rx_cpt;
1047
1048                 lnet_net_unlock(cpt);
1049                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, msg, 1,
1050                              0, msg->msg_len, msg->msg_len);
1051                 lnet_net_lock(cpt);
1052         }
1053         return LNET_CREDIT_OK;
1054 }
1055
1056 void
1057 lnet_return_tx_credits_locked(struct lnet_msg *msg)
1058 {
1059         struct lnet_peer_ni     *txpeer = msg->msg_txpeer;
1060         struct lnet_ni          *txni = msg->msg_txni;
1061         struct lnet_msg         *msg2;
1062
1063         if (msg->msg_txcredit) {
1064                 struct lnet_ni       *ni = msg->msg_txni;
1065                 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
1066
1067                 /* give back NI txcredits */
1068                 msg->msg_txcredit = 0;
1069
1070                 LASSERT((tq->tq_credits < 0) ==
1071                         !list_empty(&tq->tq_delayed));
1072
1073                 tq->tq_credits++;
1074                 atomic_inc(&ni->ni_tx_credits);
1075                 if (tq->tq_credits <= 0) {
1076                         msg2 = list_entry(tq->tq_delayed.next,
1077                                           struct lnet_msg, msg_list);
1078                         list_del(&msg2->msg_list);
1079
1080                         LASSERT(msg2->msg_txni == ni);
1081                         LASSERT(msg2->msg_tx_delayed);
1082                         LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
1083
1084                         (void) lnet_post_send_locked(msg2, 1);
1085                 }
1086         }
1087
1088         if (msg->msg_peertxcredit) {
1089                 /* give back peer txcredits */
1090                 msg->msg_peertxcredit = 0;
1091
1092                 spin_lock(&txpeer->lpni_lock);
1093                 LASSERT((txpeer->lpni_txcredits < 0) ==
1094                         !list_empty(&txpeer->lpni_txq));
1095
1096                 txpeer->lpni_txqnob -= msg->msg_len + sizeof(struct lnet_hdr);
1097                 LASSERT(txpeer->lpni_txqnob >= 0);
1098
1099                 txpeer->lpni_txcredits++;
1100                 if (txpeer->lpni_txcredits <= 0) {
1101                         int msg2_cpt;
1102
1103                         msg2 = list_entry(txpeer->lpni_txq.next,
1104                                               struct lnet_msg, msg_list);
1105                         list_del(&msg2->msg_list);
1106                         spin_unlock(&txpeer->lpni_lock);
1107
1108                         LASSERT(msg2->msg_txpeer == txpeer);
1109                         LASSERT(msg2->msg_tx_delayed);
1110
1111                         msg2_cpt = msg2->msg_tx_cpt;
1112
1113                         /*
1114                          * The msg_cpt can be different from the msg2_cpt
1115                          * so we need to make sure we lock the correct cpt
1116                          * for msg2.
1117                          * Once we call lnet_post_send_locked() it is no
1118                          * longer safe to access msg2, since it could've
1119                          * been freed by lnet_finalize(), but we still
1120                          * need to relock the correct cpt, so we cache the
1121                          * msg2_cpt for the purpose of the check that
1122                          * follows the call to lnet_pose_send_locked().
1123                          */
1124                         if (msg2_cpt != msg->msg_tx_cpt) {
1125                                 lnet_net_unlock(msg->msg_tx_cpt);
1126                                 lnet_net_lock(msg2_cpt);
1127                         }
1128                         (void) lnet_post_send_locked(msg2, 1);
1129                         if (msg2_cpt != msg->msg_tx_cpt) {
1130                                 lnet_net_unlock(msg2_cpt);
1131                                 lnet_net_lock(msg->msg_tx_cpt);
1132                         }
1133                 } else {
1134                         spin_unlock(&txpeer->lpni_lock);
1135                 }
1136         }
1137
1138         if (txni != NULL) {
1139                 msg->msg_txni = NULL;
1140                 lnet_ni_decref_locked(txni, msg->msg_tx_cpt);
1141         }
1142
1143         if (txpeer != NULL) {
1144                 msg->msg_txpeer = NULL;
1145                 lnet_peer_ni_decref_locked(txpeer);
1146         }
1147 }
1148
1149 void
1150 lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp)
1151 {
1152         struct lnet_msg *msg;
1153
1154         if (list_empty(&rbp->rbp_msgs))
1155                 return;
1156         msg = list_entry(rbp->rbp_msgs.next,
1157                          struct lnet_msg, msg_list);
1158         list_del(&msg->msg_list);
1159
1160         (void)lnet_post_routed_recv_locked(msg, 1);
1161 }
1162
1163 void
1164 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
1165 {
1166         struct lnet_msg *msg;
1167         struct lnet_msg *tmp;
1168
1169         lnet_net_unlock(cpt);
1170
1171         list_for_each_entry_safe(msg, tmp, list, msg_list) {
1172                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
1173                              0, 0, 0, msg->msg_hdr.payload_length);
1174                 list_del_init(&msg->msg_list);
1175                 msg->msg_no_resend = true;
1176                 msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
1177                 lnet_finalize(msg, -ECANCELED);
1178         }
1179
1180         lnet_net_lock(cpt);
1181 }
1182
1183 void
1184 lnet_return_rx_credits_locked(struct lnet_msg *msg)
1185 {
1186         struct lnet_peer_ni *rxpeerni = msg->msg_rxpeer;
1187         struct lnet_peer *lp;
1188         struct lnet_ni *rxni = msg->msg_rxni;
1189         struct lnet_msg *msg2;
1190
1191         if (msg->msg_rtrcredit) {
1192                 /* give back global router credits */
1193                 struct lnet_rtrbuf *rb;
1194                 struct lnet_rtrbufpool *rbp;
1195
1196                 /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1197                  * there until it gets one allocated, or aborts the wait
1198                  * itself */
1199                 LASSERT(msg->msg_kiov != NULL);
1200
1201                 rb = list_entry(msg->msg_kiov, struct lnet_rtrbuf, rb_kiov[0]);
1202                 rbp = rb->rb_pool;
1203
1204                 msg->msg_kiov = NULL;
1205                 msg->msg_rtrcredit = 0;
1206
1207                 LASSERT(rbp == lnet_msg2bufpool(msg));
1208
1209                 LASSERT((rbp->rbp_credits > 0) ==
1210                         !list_empty(&rbp->rbp_bufs));
1211
1212                 /* If routing is now turned off, we just drop this buffer and
1213                  * don't bother trying to return credits.  */
1214                 if (!the_lnet.ln_routing) {
1215                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1216                         goto routing_off;
1217                 }
1218
1219                 /* It is possible that a user has lowered the desired number of
1220                  * buffers in this pool.  Make sure we never put back
1221                  * more buffers than the stated number. */
1222                 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
1223                         /* Discard this buffer so we don't have too
1224                          * many. */
1225                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1226                         rbp->rbp_nbuffers--;
1227                 } else {
1228                         list_add(&rb->rb_list, &rbp->rbp_bufs);
1229                         rbp->rbp_credits++;
1230                         if (rbp->rbp_credits <= 0)
1231                                 lnet_schedule_blocked_locked(rbp);
1232                 }
1233         }
1234
1235 routing_off:
1236         if (msg->msg_peerrtrcredit) {
1237                 LASSERT(rxpeerni);
1238                 LASSERT(rxpeerni->lpni_peer_net);
1239                 LASSERT(rxpeerni->lpni_peer_net->lpn_peer);
1240
1241                 lp = rxpeerni->lpni_peer_net->lpn_peer;
1242
1243                 /* give back peer router credits */
1244                 msg->msg_peerrtrcredit = 0;
1245
1246                 spin_lock(&rxpeerni->lpni_lock);
1247                 spin_lock(&lp->lp_lock);
1248
1249                 rxpeerni->lpni_rtrcredits++;
1250
1251                 /* drop all messages which are queued to be routed on that
1252                  * peer. */
1253                 if (!the_lnet.ln_routing) {
1254                         struct list_head drop;
1255                         INIT_LIST_HEAD(&drop);
1256                         list_splice_init(&lp->lp_rtrq, &drop);
1257                         spin_unlock(&lp->lp_lock);
1258                         spin_unlock(&rxpeerni->lpni_lock);
1259                         lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
1260                 } else if (!list_empty(&lp->lp_rtrq)) {
1261                         int msg2_cpt;
1262
1263                         msg2 = list_entry(lp->lp_rtrq.next,
1264                                           struct lnet_msg, msg_list);
1265                         list_del(&msg2->msg_list);
1266                         msg2_cpt = msg2->msg_rx_cpt;
1267                         spin_unlock(&lp->lp_lock);
1268                         spin_unlock(&rxpeerni->lpni_lock);
1269                         /*
1270                          * messages on the lp_rtrq can be from any NID in
1271                          * the peer, which means they might have different
1272                          * cpts. We need to make sure we lock the right
1273                          * one.
1274                          */
1275                         if (msg2_cpt != msg->msg_rx_cpt) {
1276                                 lnet_net_unlock(msg->msg_rx_cpt);
1277                                 lnet_net_lock(msg2_cpt);
1278                         }
1279                         (void) lnet_post_routed_recv_locked(msg2, 1);
1280                         if (msg2_cpt != msg->msg_rx_cpt) {
1281                                 lnet_net_unlock(msg2_cpt);
1282                                 lnet_net_lock(msg->msg_rx_cpt);
1283                         }
1284                 } else {
1285                         spin_unlock(&lp->lp_lock);
1286                         spin_unlock(&rxpeerni->lpni_lock);
1287                 }
1288         }
1289         if (rxni != NULL) {
1290                 msg->msg_rxni = NULL;
1291                 lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
1292         }
1293         if (rxpeerni != NULL) {
1294                 msg->msg_rxpeer = NULL;
1295                 lnet_peer_ni_decref_locked(rxpeerni);
1296         }
1297 }
1298
1299 static int
1300 lnet_compare_peers(struct lnet_peer_ni *p1, struct lnet_peer_ni *p2)
1301 {
1302         if (p1->lpni_txqnob < p2->lpni_txqnob)
1303                 return 1;
1304
1305         if (p1->lpni_txqnob > p2->lpni_txqnob)
1306                 return -1;
1307
1308         if (p1->lpni_txcredits > p2->lpni_txcredits)
1309                 return 1;
1310
1311         if (p1->lpni_txcredits < p2->lpni_txcredits)
1312                 return -1;
1313
1314         return 0;
1315 }
1316
1317 static struct lnet_peer_ni *
1318 lnet_select_peer_ni(struct lnet_ni *best_ni, lnet_nid_t dst_nid,
1319                     struct lnet_peer *peer,
1320                     struct lnet_peer_net *peer_net)
1321 {
1322         /*
1323          * Look at the peer NIs for the destination peer that connect
1324          * to the chosen net. If a peer_ni is preferred when using the
1325          * best_ni to communicate, we use that one. If there is no
1326          * preferred peer_ni, or there are multiple preferred peer_ni,
1327          * the available transmit credits are used. If the transmit
1328          * credits are equal, we round-robin over the peer_ni.
1329          */
1330         struct lnet_peer_ni *lpni = NULL;
1331         struct lnet_peer_ni *best_lpni = NULL;
1332         int best_lpni_credits = INT_MIN;
1333         bool preferred = false;
1334         bool ni_is_pref;
1335         int best_lpni_healthv = 0;
1336         int lpni_healthv;
1337
1338         while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
1339                 /*
1340                  * if the best_ni we've chosen aleady has this lpni
1341                  * preferred, then let's use it
1342                  */
1343                 if (best_ni) {
1344                         ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
1345                                                                 best_ni->ni_nid);
1346                         CDEBUG(D_NET, "%s ni_is_pref = %d\n",
1347                                libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
1348                 } else {
1349                         ni_is_pref = false;
1350                 }
1351
1352                 lpni_healthv = atomic_read(&lpni->lpni_healthv);
1353
1354                 if (best_lpni)
1355                         CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
1356                                 libcfs_nid2str(lpni->lpni_nid),
1357                                 lpni->lpni_txcredits, best_lpni_credits,
1358                                 lpni->lpni_seq, best_lpni->lpni_seq);
1359
1360                 /* pick the healthiest peer ni */
1361                 if (lpni_healthv < best_lpni_healthv) {
1362                         continue;
1363                 } else if (lpni_healthv > best_lpni_healthv) {
1364                         best_lpni_healthv = lpni_healthv;
1365                 /* if this is a preferred peer use it */
1366                 } else if (!preferred && ni_is_pref) {
1367                         preferred = true;
1368                 } else if (preferred && !ni_is_pref) {
1369                         /*
1370                          * this is not the preferred peer so let's ignore
1371                          * it.
1372                          */
1373                         continue;
1374                 } else if (lpni->lpni_txcredits < best_lpni_credits) {
1375                         /*
1376                          * We already have a peer that has more credits
1377                          * available than this one. No need to consider
1378                          * this peer further.
1379                          */
1380                         continue;
1381                 } else if (lpni->lpni_txcredits == best_lpni_credits) {
1382                         /*
1383                          * The best peer found so far and the current peer
1384                          * have the same number of available credits let's
1385                          * make sure to select between them using Round
1386                          * Robin
1387                          */
1388                         if (best_lpni) {
1389                                 if (best_lpni->lpni_seq <= lpni->lpni_seq)
1390                                         continue;
1391                         }
1392                 }
1393
1394                 best_lpni = lpni;
1395                 best_lpni_credits = lpni->lpni_txcredits;
1396         }
1397
1398         /* if we still can't find a peer ni then we can't reach it */
1399         if (!best_lpni) {
1400                 __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
1401                         LNET_NIDNET(dst_nid);
1402                 CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
1403                                 libcfs_net2str(net_id));
1404                 return NULL;
1405         }
1406
1407         CDEBUG(D_NET, "sd_best_lpni = %s\n",
1408                libcfs_nid2str(best_lpni->lpni_nid));
1409
1410         return best_lpni;
1411 }
1412
1413 /*
1414  * Prerequisite: the best_ni should already be set in the sd
1415  */
1416 static inline struct lnet_peer_ni *
1417 lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
1418                            __u32 net_id)
1419 {
1420         struct lnet_peer_net *peer_net;
1421
1422         /*
1423          * The gateway is Multi-Rail capable so now we must select the
1424          * proper peer_ni
1425          */
1426         peer_net = lnet_peer_get_net_locked(peer, net_id);
1427
1428         if (!peer_net) {
1429                 CERROR("gateway peer %s has no NI on net %s\n",
1430                        libcfs_nid2str(peer->lp_primary_nid),
1431                        libcfs_net2str(net_id));
1432                 return NULL;
1433         }
1434
1435         return lnet_select_peer_ni(sd->sd_best_ni, sd->sd_dst_nid,
1436                                    peer, peer_net);
1437 }
1438
1439 static int
1440 lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2,
1441                     struct lnet_peer_ni **best_lpni)
1442 {
1443         int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
1444         int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
1445         struct lnet_peer *lp1 = r1->lr_gateway;
1446         struct lnet_peer *lp2 = r2->lr_gateway;
1447         struct lnet_peer_ni *lpni1;
1448         struct lnet_peer_ni *lpni2;
1449         struct lnet_send_data sd;
1450         int rc;
1451
1452         sd.sd_best_ni = NULL;
1453         sd.sd_dst_nid = LNET_NID_ANY;
1454         lpni1 = lnet_find_best_lpni_on_net(&sd, lp1, r1->lr_lnet);
1455         lpni2 = lnet_find_best_lpni_on_net(&sd, lp2, r2->lr_lnet);
1456         LASSERT(lpni1 && lpni2);
1457
1458         if (r1->lr_priority < r2->lr_priority) {
1459                 *best_lpni = lpni1;
1460                 return 1;
1461         }
1462
1463         if (r1->lr_priority > r2->lr_priority) {
1464                 *best_lpni = lpni2;
1465                 return -1;
1466         }
1467
1468         if (r1_hops < r2_hops) {
1469                 *best_lpni = lpni1;
1470                 return 1;
1471         }
1472
1473         if (r1_hops > r2_hops) {
1474                 *best_lpni = lpni2;
1475                 return -1;
1476         }
1477
1478         rc = lnet_compare_peers(lpni1, lpni2);
1479         if (rc == 1) {
1480                 *best_lpni = lpni1;
1481                 return rc;
1482         } else if (rc == -1) {
1483                 *best_lpni = lpni2;
1484                 return rc;
1485         }
1486
1487         if (r1->lr_seq - r2->lr_seq <= 0) {
1488                 *best_lpni = lpni1;
1489                 return 1;
1490         }
1491
1492         *best_lpni = lpni2;
1493         return -1;
1494 }
1495
1496 static struct lnet_route *
1497 lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
1498                        lnet_nid_t rtr_nid, struct lnet_route **prev_route,
1499                        struct lnet_peer_ni **gwni)
1500 {
1501         struct lnet_peer_ni *best_gw_ni = NULL;
1502         struct lnet_route *best_route;
1503         struct lnet_route *last_route;
1504         struct lnet_remotenet *rnet;
1505         struct lnet_peer *lp_best;
1506         struct lnet_route *route;
1507         struct lnet_peer *lp;
1508         int rc;
1509
1510         /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
1511          * rtr_nid nid, otherwise find the best gateway I can use */
1512
1513         rnet = lnet_find_rnet_locked(remote_net);
1514         if (rnet == NULL)
1515                 return NULL;
1516
1517         lp_best = NULL;
1518         best_route = last_route = NULL;
1519         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1520                 lp = route->lr_gateway;
1521
1522                 if (!lnet_is_route_alive(route))
1523                         continue;
1524
1525                 if (lp_best == NULL) {
1526                         best_route = last_route = route;
1527                         lp_best = lp;
1528                 }
1529
1530                 /* no protection on below fields, but it's harmless */
1531                 if (last_route->lr_seq - route->lr_seq < 0)
1532                         last_route = route;
1533
1534                 rc = lnet_compare_routes(route, best_route, &best_gw_ni);
1535                 if (rc < 0)
1536                         continue;
1537
1538                 best_route = route;
1539                 lp_best = lp;
1540         }
1541
1542         *prev_route = last_route;
1543         *gwni = best_gw_ni;
1544
1545         return best_route;
1546 }
1547
1548 static struct lnet_ni *
1549 lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *best_ni,
1550                  struct lnet_peer *peer, struct lnet_peer_net *peer_net,
1551                  int md_cpt)
1552 {
1553         struct lnet_ni *ni = NULL;
1554         unsigned int shortest_distance;
1555         int best_credits;
1556         int best_healthv;
1557
1558         /*
1559          * If there is no peer_ni that we can send to on this network,
1560          * then there is no point in looking for a new best_ni here.
1561         */
1562         if (!lnet_get_next_peer_ni_locked(peer, peer_net, NULL))
1563                 return best_ni;
1564
1565         if (best_ni == NULL) {
1566                 shortest_distance = UINT_MAX;
1567                 best_credits = INT_MIN;
1568                 best_healthv = 0;
1569         } else {
1570                 shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt,
1571                                                      best_ni->ni_dev_cpt);
1572                 best_credits = atomic_read(&best_ni->ni_tx_credits);
1573                 best_healthv = atomic_read(&best_ni->ni_healthv);
1574         }
1575
1576         while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
1577                 unsigned int distance;
1578                 int ni_credits;
1579                 int ni_healthv;
1580                 int ni_fatal;
1581
1582                 ni_credits = atomic_read(&ni->ni_tx_credits);
1583                 ni_healthv = atomic_read(&ni->ni_healthv);
1584                 ni_fatal = atomic_read(&ni->ni_fatal_error_on);
1585
1586                 /*
1587                  * calculate the distance from the CPT on which
1588                  * the message memory is allocated to the CPT of
1589                  * the NI's physical device
1590                  */
1591                 distance = cfs_cpt_distance(lnet_cpt_table(),
1592                                             md_cpt,
1593                                             ni->ni_dev_cpt);
1594
1595                 CDEBUG(D_NET, "compare ni %s [c:%d, d:%d, s:%d] with best_ni %s [c:%d, d:%d, s:%d]\n",
1596                        libcfs_nid2str(ni->ni_nid), ni_credits, distance,
1597                        ni->ni_seq, (best_ni) ? libcfs_nid2str(best_ni->ni_nid)
1598                         : "not seleced", best_credits, shortest_distance,
1599                         (best_ni) ? best_ni->ni_seq : 0);
1600
1601                 /*
1602                  * All distances smaller than the NUMA range
1603                  * are treated equally.
1604                  */
1605                 if (distance < lnet_numa_range)
1606                         distance = lnet_numa_range;
1607
1608                 /*
1609                  * Select on health, shorter distance, available
1610                  * credits, then round-robin.
1611                  */
1612                 if (ni_fatal) {
1613                         continue;
1614                 } else if (ni_healthv < best_healthv) {
1615                         continue;
1616                 } else if (ni_healthv > best_healthv) {
1617                         best_healthv = ni_healthv;
1618                         /*
1619                          * If we're going to prefer this ni because it's
1620                          * the healthiest, then we should set the
1621                          * shortest_distance in the algorithm in case
1622                          * there are multiple NIs with the same health but
1623                          * different distances.
1624                          */
1625                         if (distance < shortest_distance)
1626                                 shortest_distance = distance;
1627                 } else if (distance > shortest_distance) {
1628                         continue;
1629                 } else if (distance < shortest_distance) {
1630                         shortest_distance = distance;
1631                 } else if (ni_credits < best_credits) {
1632                         continue;
1633                 } else if (ni_credits == best_credits) {
1634                         if (best_ni && best_ni->ni_seq <= ni->ni_seq)
1635                                 continue;
1636                 }
1637                 best_ni = ni;
1638                 best_credits = ni_credits;
1639         }
1640
1641         CDEBUG(D_NET, "selected best_ni %s\n",
1642                (best_ni) ? libcfs_nid2str(best_ni->ni_nid) : "no selection");
1643
1644         return best_ni;
1645 }
1646
1647 /*
1648  * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery,
1649  * because such traffic is required to perform discovery. We therefore
1650  * exclude all GET and PUT on that portal. We also exclude all ACK and
1651  * REPLY traffic, but that is because the portal is not tracked in the
1652  * message structure for these message types. We could restrict this
1653  * further by also checking for LNET_PROTO_PING_MATCHBITS.
1654  */
1655 static bool
1656 lnet_msg_discovery(struct lnet_msg *msg)
1657 {
1658         if (msg->msg_type == LNET_MSG_PUT) {
1659                 if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL)
1660                         return true;
1661         } else if (msg->msg_type == LNET_MSG_GET) {
1662                 if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL)
1663                         return true;
1664         }
1665         return false;
1666 }
1667
1668 #define SRC_SPEC        0x0001
1669 #define SRC_ANY         0x0002
1670 #define LOCAL_DST       0x0004
1671 #define REMOTE_DST      0x0008
1672 #define MR_DST          0x0010
1673 #define NMR_DST         0x0020
1674 #define SND_RESP        0x0040
1675
1676 /* The following to defines are used for return codes */
1677 #define REPEAT_SEND     0x1000
1678 #define PASS_THROUGH    0x2000
1679
1680 /* The different cases lnet_select pathway needs to handle */
1681 #define SRC_SPEC_LOCAL_MR_DST   (SRC_SPEC | LOCAL_DST | MR_DST)
1682 #define SRC_SPEC_ROUTER_MR_DST  (SRC_SPEC | REMOTE_DST | MR_DST)
1683 #define SRC_SPEC_LOCAL_NMR_DST  (SRC_SPEC | LOCAL_DST | NMR_DST)
1684 #define SRC_SPEC_ROUTER_NMR_DST (SRC_SPEC | REMOTE_DST | NMR_DST)
1685 #define SRC_ANY_LOCAL_MR_DST    (SRC_ANY | LOCAL_DST | MR_DST)
1686 #define SRC_ANY_ROUTER_MR_DST   (SRC_ANY | REMOTE_DST | MR_DST)
1687 #define SRC_ANY_LOCAL_NMR_DST   (SRC_ANY | LOCAL_DST | NMR_DST)
1688 #define SRC_ANY_ROUTER_NMR_DST  (SRC_ANY | REMOTE_DST | NMR_DST)
1689
1690 static int
1691 lnet_handle_lo_send(struct lnet_send_data *sd)
1692 {
1693         struct lnet_msg *msg = sd->sd_msg;
1694         int cpt = sd->sd_cpt;
1695
1696         /* No send credit hassles with LOLND */
1697         lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
1698         msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
1699         if (!msg->msg_routing)
1700                 msg->msg_hdr.src_nid =
1701                         cpu_to_le64(the_lnet.ln_loni->ni_nid);
1702         msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
1703         lnet_msg_commit(msg, cpt);
1704         msg->msg_txni = the_lnet.ln_loni;
1705
1706         return LNET_CREDIT_OK;
1707 }
1708
1709 static int
1710 lnet_handle_send(struct lnet_send_data *sd)
1711 {
1712         struct lnet_ni *best_ni = sd->sd_best_ni;
1713         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
1714         struct lnet_peer_ni *final_dst_lpni = sd->sd_final_dst_lpni;
1715         struct lnet_msg *msg = sd->sd_msg;
1716         int cpt2;
1717         __u32 send_case = sd->sd_send_case;
1718         int rc;
1719         __u32 routing = send_case & REMOTE_DST;
1720          struct lnet_rsp_tracker *rspt;
1721
1722         /*
1723          * Increment sequence number of the selected peer so that we
1724          * pick the next one in Round Robin.
1725          */
1726         best_lpni->lpni_seq++;
1727
1728         /*
1729          * grab a reference on the peer_ni so it sticks around even if
1730          * we need to drop and relock the lnet_net_lock below.
1731          */
1732         lnet_peer_ni_addref_locked(best_lpni);
1733
1734         /*
1735          * Use lnet_cpt_of_nid() to determine the CPT used to commit the
1736          * message. This ensures that we get a CPT that is correct for
1737          * the NI when the NI has been restricted to a subset of all CPTs.
1738          * If the selected CPT differs from the one currently locked, we
1739          * must unlock and relock the lnet_net_lock(), and then check whether
1740          * the configuration has changed. We don't have a hold on the best_ni
1741          * yet, and it may have vanished.
1742          */
1743         cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
1744         if (sd->sd_cpt != cpt2) {
1745                 __u32 seq = lnet_get_dlc_seq_locked();
1746                 lnet_net_unlock(sd->sd_cpt);
1747                 sd->sd_cpt = cpt2;
1748                 lnet_net_lock(sd->sd_cpt);
1749                 if (seq != lnet_get_dlc_seq_locked()) {
1750                         lnet_peer_ni_decref_locked(best_lpni);
1751                         return REPEAT_SEND;
1752                 }
1753         }
1754
1755         /*
1756          * store the best_lpni in the message right away to avoid having
1757          * to do the same operation under different conditions
1758          */
1759         msg->msg_txpeer = best_lpni;
1760         msg->msg_txni = best_ni;
1761
1762         /*
1763          * grab a reference for the best_ni since now it's in use in this
1764          * send. The reference will be dropped in lnet_finalize()
1765          */
1766         lnet_ni_addref_locked(msg->msg_txni, sd->sd_cpt);
1767
1768         /*
1769          * Always set the target.nid to the best peer picked. Either the
1770          * NID will be one of the peer NIDs selected, or the same NID as
1771          * what was originally set in the target or it will be the NID of
1772          * a router if this message should be routed
1773          */
1774         msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
1775
1776         /*
1777          * lnet_msg_commit assigns the correct cpt to the message, which
1778          * is used to decrement the correct refcount on the ni when it's
1779          * time to return the credits
1780          */
1781         lnet_msg_commit(msg, sd->sd_cpt);
1782
1783         /*
1784          * If we are routing the message then we keep the src_nid that was
1785          * set by the originator. If we are not routing then we are the
1786          * originator and set it here.
1787          */
1788         if (!msg->msg_routing)
1789                 msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
1790
1791         if (routing) {
1792                 msg->msg_target_is_router = 1;
1793                 msg->msg_target.pid = LNET_PID_LUSTRE;
1794                 /*
1795                  * since we're routing we want to ensure that the
1796                  * msg_hdr.dest_nid is set to the final destination. When
1797                  * the router receives this message it knows how to route
1798                  * it.
1799                  *
1800                  * final_dst_lpni is set at the beginning of the
1801                  * lnet_select_pathway() function and is never changed.
1802                  * It's safe to use it here.
1803                  */
1804                 msg->msg_hdr.dest_nid = cpu_to_le64(final_dst_lpni->lpni_nid);
1805         } else {
1806                 /*
1807                  * if we're not routing set the dest_nid to the best peer
1808                  * ni NID that we picked earlier in the algorithm.
1809                  */
1810                 msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
1811         }
1812
1813         /*
1814          * if we have response tracker block update it with the next hop
1815          * nid
1816          */
1817         if (msg->msg_md) {
1818                 rspt = msg->msg_md->md_rspt_ptr;
1819                 if (rspt) {
1820                         rspt->rspt_next_hop_nid = msg->msg_txpeer->lpni_nid;
1821                         CDEBUG(D_NET, "rspt_next_hop_nid = %s\n",
1822                                libcfs_nid2str(rspt->rspt_next_hop_nid));
1823                 }
1824         }
1825
1826         rc = lnet_post_send_locked(msg, 0);
1827
1828         if (!rc)
1829                 CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s try# %d\n",
1830                        libcfs_nid2str(msg->msg_hdr.src_nid),
1831                        libcfs_nid2str(msg->msg_txni->ni_nid),
1832                        libcfs_nid2str(sd->sd_src_nid),
1833                        libcfs_nid2str(msg->msg_hdr.dest_nid),
1834                        libcfs_nid2str(sd->sd_dst_nid),
1835                        libcfs_nid2str(msg->msg_txpeer->lpni_nid),
1836                        lnet_msgtyp2str(msg->msg_type), msg->msg_retry_count);
1837
1838         return rc;
1839 }
1840
1841 static inline void
1842 lnet_set_non_mr_pref_nid(struct lnet_send_data *sd)
1843 {
1844         if (sd->sd_send_case & NMR_DST &&
1845             sd->sd_msg->msg_type != LNET_MSG_REPLY &&
1846             sd->sd_msg->msg_type != LNET_MSG_ACK &&
1847             sd->sd_best_lpni->lpni_pref_nnids == 0) {
1848                 CDEBUG(D_NET, "Setting preferred local NID %s on NMR peer %s\n",
1849                        libcfs_nid2str(sd->sd_best_ni->ni_nid),
1850                        libcfs_nid2str(sd->sd_best_lpni->lpni_nid));
1851                 lnet_peer_ni_set_non_mr_pref_nid(sd->sd_best_lpni,
1852                                                  sd->sd_best_ni->ni_nid);
1853         }
1854 }
1855
1856 /*
1857  * Source Specified
1858  * Local Destination
1859  * non-mr peer
1860  *
1861  * use the source and destination NIDs as the pathway
1862  */
1863 static int
1864 lnet_handle_spec_local_nmr_dst(struct lnet_send_data *sd)
1865 {
1866         /* the destination lpni is set before we get here. */
1867
1868         /* find local NI */
1869         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
1870         if (!sd->sd_best_ni) {
1871                 CERROR("Can't send to %s: src %s is not a "
1872                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
1873                                 libcfs_nid2str(sd->sd_src_nid));
1874                 return -EINVAL;
1875         }
1876
1877         /*
1878          * the preferred NID will only be set for NMR peers
1879          */
1880         lnet_set_non_mr_pref_nid(sd);
1881
1882         return lnet_handle_send(sd);
1883 }
1884
1885 /*
1886  * Source Specified
1887  * Local Destination
1888  * MR Peer
1889  *
1890  * Run the selection algorithm on the peer NIs unless we're sending
1891  * a response, in this case just send to the destination
1892  */
1893 static int
1894 lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
1895 {
1896         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
1897         if (!sd->sd_best_ni) {
1898                 CERROR("Can't send to %s: src %s is not a "
1899                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
1900                                 libcfs_nid2str(sd->sd_src_nid));
1901                 return -EINVAL;
1902         }
1903
1904         /*
1905          * only run the selection algorithm to pick the peer_ni if we're
1906          * sending a GET or a PUT. Responses are sent to the same
1907          * destination NID provided.
1908          */
1909         if (!(sd->sd_send_case & SND_RESP)) {
1910                 sd->sd_best_lpni =
1911                   lnet_find_best_lpni_on_net(sd, sd->sd_peer,
1912                                              sd->sd_best_ni->ni_net->net_id);
1913         }
1914
1915         if (sd->sd_best_lpni &&
1916             sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid)
1917                 return lnet_handle_lo_send(sd);
1918         else if (sd->sd_best_lpni)
1919                 return lnet_handle_send(sd);
1920
1921         CERROR("can't send to %s. no NI on %s\n",
1922                libcfs_nid2str(sd->sd_dst_nid),
1923                libcfs_net2str(sd->sd_best_ni->ni_net->net_id));
1924
1925         return -EHOSTUNREACH;
1926 }
1927
1928 struct lnet_ni *
1929 lnet_find_best_ni_on_spec_net(struct lnet_ni *cur_best_ni,
1930                               struct lnet_peer *peer,
1931                               struct lnet_peer_net *peer_net,
1932                               int cpt,
1933                               bool incr_seq)
1934 {
1935         struct lnet_net *local_net;
1936         struct lnet_ni *best_ni;
1937
1938         local_net = lnet_get_net_locked(peer_net->lpn_net_id);
1939         if (!local_net)
1940                 return NULL;
1941
1942         /*
1943          * Iterate through the NIs in this local Net and select
1944          * the NI to send from. The selection is determined by
1945          * these 3 criterion in the following priority:
1946          *      1. NUMA
1947          *      2. NI available credits
1948          *      3. Round Robin
1949          */
1950         best_ni = lnet_get_best_ni(local_net, cur_best_ni,
1951                                    peer, peer_net, cpt);
1952
1953         if (incr_seq && best_ni)
1954                 best_ni->ni_seq++;
1955
1956         return best_ni;
1957 }
1958
1959 static int
1960 lnet_initiate_peer_discovery(struct lnet_peer_ni *lpni,
1961                              struct lnet_msg *msg, lnet_nid_t rtr_nid,
1962                              int cpt)
1963 {
1964         struct lnet_peer *peer;
1965         lnet_nid_t primary_nid;
1966         int rc;
1967
1968         lnet_peer_ni_addref_locked(lpni);
1969
1970         peer = lpni->lpni_peer_net->lpn_peer;
1971
1972         if (lnet_peer_gw_discovery(peer)) {
1973                 lnet_peer_ni_decref_locked(lpni);
1974                 return 0;
1975         }
1976
1977         if (!lnet_msg_discovery(msg) || lnet_peer_is_uptodate(peer)) {
1978                 lnet_peer_ni_decref_locked(lpni);
1979                 return 0;
1980         }
1981
1982         rc = lnet_discover_peer_locked(lpni, cpt, false);
1983         if (rc) {
1984                 lnet_peer_ni_decref_locked(lpni);
1985                 return rc;
1986         }
1987         /* The peer may have changed. */
1988         peer = lpni->lpni_peer_net->lpn_peer;
1989         /* queue message and return */
1990         msg->msg_rtr_nid_param = rtr_nid;
1991         msg->msg_sending = 0;
1992         msg->msg_txpeer = NULL;
1993         spin_lock(&peer->lp_lock);
1994         list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
1995         spin_unlock(&peer->lp_lock);
1996         lnet_peer_ni_decref_locked(lpni);
1997         primary_nid = peer->lp_primary_nid;
1998
1999         CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
2000                 msg, libcfs_nid2str(primary_nid));
2001
2002         return LNET_DC_WAIT;
2003 }
2004
2005 static int
2006 lnet_handle_find_routed_path(struct lnet_send_data *sd,
2007                              lnet_nid_t dst_nid,
2008                              struct lnet_peer_ni **gw_lpni,
2009                              struct lnet_peer **gw_peer)
2010 {
2011         int rc;
2012         struct lnet_peer *gw;
2013         struct lnet_peer *lp;
2014         struct lnet_peer_net *lpn;
2015         struct lnet_peer_net *best_lpn = NULL;
2016         struct lnet_remotenet *rnet;
2017         struct lnet_route *best_route;
2018         struct lnet_route *last_route;
2019         struct lnet_peer_ni *lpni = NULL;
2020         struct lnet_peer_ni *gwni = NULL;
2021         lnet_nid_t src_nid = sd->sd_src_nid;
2022
2023         /* we've already looked up the initial lpni using dst_nid */
2024         lpni = sd->sd_best_lpni;
2025         /* the peer tree must be in existence */
2026         LASSERT(lpni && lpni->lpni_peer_net && lpni->lpni_peer_net->lpn_peer);
2027         lp = lpni->lpni_peer_net->lpn_peer;
2028
2029         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
2030                 /* is this remote network reachable?  */
2031                 rnet = lnet_find_rnet_locked(lpn->lpn_net_id);
2032                 if (!rnet)
2033                         continue;
2034
2035                 if (!best_lpn)
2036                         best_lpn = lpn;
2037
2038                 if (best_lpn->lpn_seq <= lpn->lpn_seq)
2039                         continue;
2040
2041                 best_lpn = lpn;
2042         }
2043
2044         if (!best_lpn) {
2045                 CERROR("peer %s has no available nets \n",
2046                        libcfs_nid2str(sd->sd_dst_nid));
2047                 return -EHOSTUNREACH;
2048         }
2049
2050         sd->sd_best_lpni = lnet_find_best_lpni_on_net(sd, lp, best_lpn->lpn_net_id);
2051         if (!sd->sd_best_lpni) {
2052                 CERROR("peer %s down\n", libcfs_nid2str(sd->sd_dst_nid));
2053                 return -EHOSTUNREACH;
2054         }
2055
2056         best_route = lnet_find_route_locked(NULL, best_lpn->lpn_net_id,
2057                                             sd->sd_rtr_nid, &last_route,
2058                                             &gwni);
2059         if (!best_route) {
2060                 CERROR("no route to %s from %s\n",
2061                        libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
2062                 return -EHOSTUNREACH;
2063         }
2064
2065         if (!gwni) {
2066                 CERROR("Internal Error. Route expected to %s from %s\n",
2067                         libcfs_nid2str(dst_nid),
2068                         libcfs_nid2str(src_nid));
2069                 return -EFAULT;
2070         }
2071
2072         gw = best_route->lr_gateway;
2073         LASSERT(gw == gwni->lpni_peer_net->lpn_peer);
2074
2075         /*
2076          * Discover this gateway if it hasn't already been discovered.
2077          * This means we might delay the message until discovery has
2078          * completed
2079          */
2080         sd->sd_msg->msg_src_nid_param = sd->sd_src_nid;
2081         rc = lnet_initiate_peer_discovery(gwni, sd->sd_msg, sd->sd_rtr_nid,
2082                                           sd->sd_cpt);
2083         if (rc)
2084                 return rc;
2085
2086         if (!sd->sd_best_ni)
2087                 sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, gw,
2088                                         lnet_peer_get_net_locked(gw,
2089                                                 best_route->lr_lnet),
2090                                         sd->sd_md_cpt,
2091                                         true);
2092
2093         if (!sd->sd_best_ni) {
2094                 CERROR("Internal Error. Expected local ni on %s "
2095                        "but non found :%s\n",
2096                        libcfs_net2str(best_route->lr_lnet),
2097                        libcfs_nid2str(sd->sd_src_nid));
2098                 return -EFAULT;
2099         }
2100
2101         *gw_lpni = gwni;
2102         *gw_peer = gw;
2103
2104         /*
2105          * increment the sequence numbers since now we're sure we're
2106          * going to use this path
2107          */
2108         LASSERT(best_route && last_route);
2109         best_route->lr_seq = last_route->lr_seq + 1;
2110         best_lpn->lpn_seq++;
2111
2112         return 0;
2113 }
2114
2115 /*
2116  * Handle two cases:
2117  *
2118  * Case 1:
2119  *  Source specified
2120  *  Remote destination
2121  *  Non-MR destination
2122  *
2123  * Case 2:
2124  *  Source specified
2125  *  Remote destination
2126  *  MR destination
2127  *
2128  * The handling of these two cases is similar. Even though the destination
2129  * can be MR or non-MR, we'll deal directly with the router.
2130  */
2131 static int
2132 lnet_handle_spec_router_dst(struct lnet_send_data *sd)
2133 {
2134         int rc;
2135         struct lnet_peer_ni *gw_lpni = NULL;
2136         struct lnet_peer *gw_peer = NULL;
2137
2138         /* find local NI */
2139         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
2140         if (!sd->sd_best_ni) {
2141                 CERROR("Can't send to %s: src %s is not a "
2142                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
2143                                 libcfs_nid2str(sd->sd_src_nid));
2144                 return -EINVAL;
2145         }
2146
2147         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2148                                      &gw_peer);
2149         if (rc)
2150                 return rc;
2151
2152         if (sd->sd_send_case & NMR_DST)
2153                 /*
2154                 * since the final destination is non-MR let's set its preferred
2155                 * NID before we send
2156                 */
2157                 lnet_set_non_mr_pref_nid(sd);
2158
2159         /*
2160          * We're going to send to the gw found so let's set its
2161          * info
2162          */
2163         sd->sd_peer = gw_peer;
2164         sd->sd_best_lpni = gw_lpni;
2165
2166         return lnet_handle_send(sd);
2167 }
2168
2169 struct lnet_ni *
2170 lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt,
2171                                bool discovery)
2172 {
2173         struct lnet_peer_net *peer_net = NULL;
2174         struct lnet_ni *best_ni = NULL;
2175
2176         /*
2177          * The peer can have multiple interfaces, some of them can be on
2178          * the local network and others on a routed network. We should
2179          * prefer the local network. However if the local network is not
2180          * available then we need to try the routed network
2181          */
2182
2183         /* go through all the peer nets and find the best_ni */
2184         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
2185                 /*
2186                  * The peer's list of nets can contain non-local nets. We
2187                  * want to only examine the local ones.
2188                  */
2189                 if (!lnet_get_net_locked(peer_net->lpn_net_id))
2190                         continue;
2191                 best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer,
2192                                                    peer_net, md_cpt, false);
2193
2194                 /*
2195                  * if this is a discovery message and lp_disc_net_id is
2196                  * specified then use that net to send the discovery on.
2197                  */
2198                 if (peer->lp_disc_net_id == peer_net->lpn_net_id &&
2199                     discovery)
2200                         break;
2201         }
2202
2203         if (best_ni)
2204                 /* increment sequence number so we can round robin */
2205                 best_ni->ni_seq++;
2206
2207         return best_ni;
2208 }
2209
2210 static struct lnet_ni *
2211 lnet_find_existing_preferred_best_ni(struct lnet_send_data *sd)
2212 {
2213         struct lnet_ni *best_ni = NULL;
2214         struct lnet_peer_net *peer_net;
2215         struct lnet_peer *peer = sd->sd_peer;
2216         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
2217         struct lnet_peer_ni *lpni;
2218         int cpt = sd->sd_cpt;
2219
2220         /*
2221          * We must use a consistent source address when sending to a
2222          * non-MR peer. However, a non-MR peer can have multiple NIDs
2223          * on multiple networks, and we may even need to talk to this
2224          * peer on multiple networks -- certain types of
2225          * load-balancing configuration do this.
2226          *
2227          * So we need to pick the NI the peer prefers for this
2228          * particular network.
2229          */
2230
2231         /* Get the target peer_ni */
2232         peer_net = lnet_peer_get_net_locked(peer,
2233                         LNET_NIDNET(best_lpni->lpni_nid));
2234         LASSERT(peer_net != NULL);
2235         list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
2236                                 lpni_peer_nis) {
2237                 if (lpni->lpni_pref_nnids == 0)
2238                         continue;
2239                 LASSERT(lpni->lpni_pref_nnids == 1);
2240                 best_ni = lnet_nid2ni_locked(
2241                                 lpni->lpni_pref.nid, cpt);
2242                 break;
2243         }
2244
2245         return best_ni;
2246 }
2247
2248 /* Prerequisite: sd->sd_peer and sd->sd_best_lpni should be set */
2249 static int
2250 lnet_select_preferred_best_ni(struct lnet_send_data *sd)
2251 {
2252         struct lnet_ni *best_ni = NULL;
2253         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
2254
2255         /*
2256          * We must use a consistent source address when sending to a
2257          * non-MR peer. However, a non-MR peer can have multiple NIDs
2258          * on multiple networks, and we may even need to talk to this
2259          * peer on multiple networks -- certain types of
2260          * load-balancing configuration do this.
2261          *
2262          * So we need to pick the NI the peer prefers for this
2263          * particular network.
2264          */
2265
2266         best_ni = lnet_find_existing_preferred_best_ni(sd);
2267
2268         /* if best_ni is still not set just pick one */
2269         if (!best_ni) {
2270                 best_ni =
2271                   lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
2272                                                 sd->sd_best_lpni->lpni_peer_net,
2273                                                 sd->sd_md_cpt, true);
2274                 /* If there is no best_ni we don't have a route */
2275                 if (!best_ni) {
2276                         CERROR("no path to %s from net %s\n",
2277                                 libcfs_nid2str(best_lpni->lpni_nid),
2278                                 libcfs_net2str(best_lpni->lpni_net->net_id));
2279                         return -EHOSTUNREACH;
2280                 }
2281         }
2282
2283         sd->sd_best_ni = best_ni;
2284
2285         /* Set preferred NI if necessary. */
2286         lnet_set_non_mr_pref_nid(sd);
2287
2288         return 0;
2289 }
2290
2291
2292 /*
2293  * Source not specified
2294  * Local destination
2295  * Non-MR Peer
2296  *
2297  * always use the same source NID for NMR peers
2298  * If we've talked to that peer before then we already have a preferred
2299  * source NI associated with it. Otherwise, we select a preferred local NI
2300  * and store it in the peer
2301  */
2302 static int
2303 lnet_handle_any_local_nmr_dst(struct lnet_send_data *sd)
2304 {
2305         int rc;
2306
2307         /* sd->sd_best_lpni is already set to the final destination */
2308
2309         /*
2310          * At this point we should've created the peer ni and peer. If we
2311          * can't find it, then something went wrong. Instead of assert
2312          * output a relevant message and fail the send
2313          */
2314         if (!sd->sd_best_lpni) {
2315                 CERROR("Internal fault. Unable to send msg %s to %s. "
2316                        "NID not known\n",
2317                        lnet_msgtyp2str(sd->sd_msg->msg_type),
2318                        libcfs_nid2str(sd->sd_dst_nid));
2319                 return -EFAULT;
2320         }
2321
2322         rc = lnet_select_preferred_best_ni(sd);
2323         if (!rc)
2324                 rc = lnet_handle_send(sd);
2325
2326         return rc;
2327 }
2328
2329 static int
2330 lnet_handle_any_mr_dsta(struct lnet_send_data *sd)
2331 {
2332         /*
2333          * NOTE we've already handled the remote peer case. So we only
2334          * need to worry about the local case here.
2335          *
2336          * if we're sending a response, ACK or reply, we need to send it
2337          * to the destination NID given to us. At this point we already
2338          * have the peer_ni we're suppose to send to, so just find the
2339          * best_ni on the peer net and use that. Since we're sending to an
2340          * MR peer then we can just run the selection algorithm on our
2341          * local NIs and pick the best one.
2342          */
2343         if (sd->sd_send_case & SND_RESP) {
2344                 sd->sd_best_ni =
2345                   lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
2346                                                 sd->sd_best_lpni->lpni_peer_net,
2347                                                 sd->sd_md_cpt, true);
2348
2349                 if (!sd->sd_best_ni) {
2350                         /*
2351                          * We're not going to deal with not able to send
2352                          * a response to the provided final destination
2353                          */
2354                         CERROR("Can't send response to %s. "
2355                                "No local NI available\n",
2356                                 libcfs_nid2str(sd->sd_dst_nid));
2357                         return -EHOSTUNREACH;
2358                 }
2359
2360                 return lnet_handle_send(sd);
2361         }
2362
2363         /*
2364          * If we get here that means we're sending a fresh request, PUT or
2365          * GET, so we need to run our standard selection algorithm.
2366          * First find the best local interface that's on any of the peer's
2367          * networks.
2368          */
2369         sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer,
2370                                         sd->sd_md_cpt,
2371                                         lnet_msg_discovery(sd->sd_msg));
2372         if (sd->sd_best_ni) {
2373                 sd->sd_best_lpni =
2374                   lnet_find_best_lpni_on_net(sd, sd->sd_peer,
2375                                              sd->sd_best_ni->ni_net->net_id);
2376
2377                 /*
2378                  * if we're successful in selecting a peer_ni on the local
2379                  * network, then send to it. Otherwise fall through and
2380                  * try and see if we can reach it over another routed
2381                  * network
2382                  */
2383                 if (sd->sd_best_lpni &&
2384                     sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid) {
2385                         /*
2386                          * in case we initially started with a routed
2387                          * destination, let's reset to local
2388                          */
2389                         sd->sd_send_case &= ~REMOTE_DST;
2390                         sd->sd_send_case |= LOCAL_DST;
2391                         return lnet_handle_lo_send(sd);
2392                 } else if (sd->sd_best_lpni) {
2393                         /*
2394                          * in case we initially started with a routed
2395                          * destination, let's reset to local
2396                          */
2397                         sd->sd_send_case &= ~REMOTE_DST;
2398                         sd->sd_send_case |= LOCAL_DST;
2399                         return lnet_handle_send(sd);
2400                 }
2401
2402                 CERROR("Internal Error. Expected to have a best_lpni: "
2403                        "%s -> %s\n",
2404                        libcfs_nid2str(sd->sd_src_nid),
2405                        libcfs_nid2str(sd->sd_dst_nid));
2406
2407                 return -EFAULT;
2408         }
2409
2410         /*
2411          * Peer doesn't have a local network. Let's see if there is
2412          * a remote network we can reach it on.
2413          */
2414         return PASS_THROUGH;
2415 }
2416
2417 /*
2418  * Case 1:
2419  *      Source NID not specified
2420  *      Local destination
2421  *      MR peer
2422  *
2423  * Case 2:
2424  *      Source NID not speified
2425  *      Remote destination
2426  *      MR peer
2427  *
2428  * In both of these cases if we're sending a response, ACK or REPLY, then
2429  * we need to send to the destination NID provided.
2430  *
2431  * In the remote case let's deal with MR routers.
2432  *
2433  */
2434
2435 static int
2436 lnet_handle_any_mr_dst(struct lnet_send_data *sd)
2437 {
2438         int rc = 0;
2439         struct lnet_peer *gw_peer = NULL;
2440         struct lnet_peer_ni *gw_lpni = NULL;
2441
2442         /*
2443          * handle sending a response to a remote peer here so we don't
2444          * have to worry about it if we hit lnet_handle_any_mr_dsta()
2445          */
2446         if (sd->sd_send_case & REMOTE_DST &&
2447             sd->sd_send_case & SND_RESP) {
2448                 struct lnet_peer_ni *gw;
2449                 struct lnet_peer *gw_peer;
2450
2451                 rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw,
2452                                                   &gw_peer);
2453                 if (rc < 0) {
2454                         CERROR("Can't send response to %s. "
2455                                "No route available\n",
2456                                 libcfs_nid2str(sd->sd_dst_nid));
2457                         return -EHOSTUNREACH;
2458                 } else if (rc > 0) {
2459                         return rc;
2460                 }
2461
2462                 sd->sd_best_lpni = gw;
2463                 sd->sd_peer = gw_peer;
2464
2465                 return lnet_handle_send(sd);
2466         }
2467
2468         /*
2469          * Even though the NID for the peer might not be on a local network,
2470          * since the peer is MR there could be other interfaces on the
2471          * local network. In that case we'd still like to prefer the local
2472          * network over the routed network. If we're unable to do that
2473          * then we select the best router among the different routed networks,
2474          * and if the router is MR then we can deal with it as such.
2475          */
2476         rc = lnet_handle_any_mr_dsta(sd);
2477         if (rc != PASS_THROUGH)
2478                 return rc;
2479
2480         /*
2481          * Now that we must route to the destination, we must consider the
2482          * MR case, where the destination has multiple interfaces, some of
2483          * which we can route to and others we do not. For this reason we
2484          * need to select the destination which we can route to and if
2485          * there are multiple, we need to round robin.
2486          */
2487         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2488                                           &gw_peer);
2489         if (rc)
2490                 return rc;
2491
2492         sd->sd_send_case &= ~LOCAL_DST;
2493         sd->sd_send_case |= REMOTE_DST;
2494
2495         sd->sd_peer = gw_peer;
2496         sd->sd_best_lpni = gw_lpni;
2497
2498         return lnet_handle_send(sd);
2499 }
2500
2501 /*
2502  * Source not specified
2503  * Remote destination
2504  * Non-MR peer
2505  *
2506  * Must send to the specified peer NID using the same source NID that
2507  * we've used before. If it's the first time to talk to that peer then
2508  * find the source NI and assign it as preferred to that peer
2509  */
2510 static int
2511 lnet_handle_any_router_nmr_dst(struct lnet_send_data *sd)
2512 {
2513         int rc;
2514         struct lnet_peer_ni *gw_lpni = NULL;
2515         struct lnet_peer *gw_peer = NULL;
2516
2517         /*
2518          * Let's set if we have a preferred NI to talk to this NMR peer
2519          */
2520         sd->sd_best_ni = lnet_find_existing_preferred_best_ni(sd);
2521
2522         /*
2523          * find the router and that'll find the best NI if we didn't find
2524          * it already.
2525          */
2526         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2527                                           &gw_peer);
2528         if (rc)
2529                 return rc;
2530
2531         /*
2532          * set the best_ni we've chosen as the preferred one for
2533          * this peer
2534          */
2535         lnet_set_non_mr_pref_nid(sd);
2536
2537         /* we'll be sending to the gw */
2538         sd->sd_best_lpni = gw_lpni;
2539         sd->sd_peer = gw_peer;
2540
2541         return lnet_handle_send(sd);
2542 }
2543
2544 static int
2545 lnet_handle_send_case_locked(struct lnet_send_data *sd)
2546 {
2547         /*
2548          * turn off the SND_RESP bit.
2549          * It will be checked in the case handling
2550          */
2551         __u32 send_case = sd->sd_send_case &= ~SND_RESP ;
2552
2553         CDEBUG(D_NET, "Source %s%s to %s %s %s destination\n",
2554                 (send_case & SRC_SPEC) ? "Specified: " : "ANY",
2555                 (send_case & SRC_SPEC) ? libcfs_nid2str(sd->sd_src_nid) : "",
2556                 (send_case & MR_DST) ? "MR: " : "NMR: ",
2557                 libcfs_nid2str(sd->sd_dst_nid),
2558                 (send_case & LOCAL_DST) ? "local" : "routed");
2559
2560         switch (send_case) {
2561         /*
2562          * For all cases where the source is specified, we should always
2563          * use the destination NID, whether it's an MR destination or not,
2564          * since we're continuing a series of related messages for the
2565          * same RPC
2566          */
2567         case SRC_SPEC_LOCAL_NMR_DST:
2568                 return lnet_handle_spec_local_nmr_dst(sd);
2569         case SRC_SPEC_LOCAL_MR_DST:
2570                 return lnet_handle_spec_local_mr_dst(sd);
2571         case SRC_SPEC_ROUTER_NMR_DST:
2572         case SRC_SPEC_ROUTER_MR_DST:
2573                 return lnet_handle_spec_router_dst(sd);
2574         case SRC_ANY_LOCAL_NMR_DST:
2575                 return lnet_handle_any_local_nmr_dst(sd);
2576         case SRC_ANY_LOCAL_MR_DST:
2577         case SRC_ANY_ROUTER_MR_DST:
2578                 return lnet_handle_any_mr_dst(sd);
2579         case SRC_ANY_ROUTER_NMR_DST:
2580                 return lnet_handle_any_router_nmr_dst(sd);
2581         default:
2582                 CERROR("Unknown send case\n");
2583                 return -1;
2584         }
2585 }
2586
2587 static int
2588 lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
2589                     struct lnet_msg *msg, lnet_nid_t rtr_nid)
2590 {
2591         struct lnet_peer_ni     *lpni;
2592         struct lnet_peer        *peer;
2593         struct lnet_send_data   send_data;
2594         int                     cpt, rc;
2595         int                     md_cpt;
2596         __u32                   send_case = 0;
2597
2598         memset(&send_data, 0, sizeof(send_data));
2599
2600         /*
2601          * get an initial CPT to use for locking. The idea here is not to
2602          * serialize the calls to select_pathway, so that as many
2603          * operations can run concurrently as possible. To do that we use
2604          * the CPT where this call is being executed. Later on when we
2605          * determine the CPT to use in lnet_message_commit, we switch the
2606          * lock and check if there was any configuration change.  If none,
2607          * then we proceed, if there is, then we restart the operation.
2608          */
2609         cpt = lnet_net_lock_current();
2610
2611         md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
2612         if (md_cpt == CFS_CPT_ANY)
2613                 md_cpt = cpt;
2614
2615 again:
2616
2617         /*
2618          * If we're being asked to send to the loopback interface, there
2619          * is no need to go through any selection. We can just shortcut
2620          * the entire process and send over lolnd
2621          */
2622         send_data.sd_msg = msg;
2623         send_data.sd_cpt = cpt;
2624         if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
2625                 rc = lnet_handle_lo_send(&send_data);
2626                 lnet_net_unlock(cpt);
2627                 return rc;
2628         }
2629
2630         /*
2631          * find an existing peer_ni, or create one and mark it as having been
2632          * created due to network traffic. This call will create the
2633          * peer->peer_net->peer_ni tree.
2634          */
2635         lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
2636         if (IS_ERR(lpni)) {
2637                 lnet_net_unlock(cpt);
2638                 return PTR_ERR(lpni);
2639         }
2640
2641         /*
2642          * Cache the original src_nid. If we need to resend the message
2643          * then we'll need to know whether the src_nid was originally
2644          * specified for this message. If it was originally specified,
2645          * then we need to keep using the same src_nid since it's
2646          * continuing the same sequence of messages.
2647          */
2648         msg->msg_src_nid_param = src_nid;
2649
2650         /*
2651          * Now that we have a peer_ni, check if we want to discover
2652          * the peer. Traffic to the LNET_RESERVED_PORTAL should not
2653          * trigger discovery.
2654          */
2655         peer = lpni->lpni_peer_net->lpn_peer;
2656         rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
2657         if (rc) {
2658                 lnet_peer_ni_decref_locked(lpni);
2659                 lnet_net_unlock(cpt);
2660                 return rc;
2661         }
2662         lnet_peer_ni_decref_locked(lpni);
2663
2664         /*
2665          * Identify the different send cases
2666          */
2667         if (src_nid == LNET_NID_ANY)
2668                 send_case |= SRC_ANY;
2669         else
2670                 send_case |= SRC_SPEC;
2671
2672         if (lnet_get_net_locked(LNET_NIDNET(dst_nid)))
2673                 send_case |= LOCAL_DST;
2674         else
2675                 send_case |= REMOTE_DST;
2676
2677         /*
2678          * if this is a non-MR peer or if we're recovering a peer ni then
2679          * let's consider this an NMR case so we can hit the destination
2680          * NID.
2681          */
2682         if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery)
2683                 send_case |= NMR_DST;
2684         else
2685                 send_case |= MR_DST;
2686
2687         if (msg->msg_type == LNET_MSG_REPLY ||
2688             msg->msg_type == LNET_MSG_ACK)
2689                 send_case |= SND_RESP;
2690
2691         /* assign parameters to the send_data */
2692         send_data.sd_rtr_nid = rtr_nid;
2693         send_data.sd_src_nid = src_nid;
2694         send_data.sd_dst_nid = dst_nid;
2695         send_data.sd_best_lpni = lpni;
2696         /*
2697          * keep a pointer to the final destination in case we're going to
2698          * route, so we'll need to access it later
2699          */
2700         send_data.sd_final_dst_lpni = lpni;
2701         send_data.sd_peer = peer;
2702         send_data.sd_md_cpt = md_cpt;
2703         send_data.sd_send_case = send_case;
2704
2705         rc = lnet_handle_send_case_locked(&send_data);
2706
2707         /*
2708          * Update the local cpt since send_data.sd_cpt might've been
2709          * updated as a result of calling lnet_handle_send_case_locked().
2710          */
2711         cpt = send_data.sd_cpt;
2712
2713         if (rc == REPEAT_SEND)
2714                 goto again;
2715
2716         lnet_net_unlock(cpt);
2717
2718         return rc;
2719 }
2720
2721 int
2722 lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
2723 {
2724         lnet_nid_t              dst_nid = msg->msg_target.nid;
2725         int                     rc;
2726
2727         /*
2728          * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
2729          * but we might want to use pre-determined router for ACK/REPLY
2730          * in the future
2731          */
2732         /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
2733         LASSERT(msg->msg_txpeer == NULL);
2734         LASSERT(msg->msg_txni == NULL);
2735         LASSERT(!msg->msg_sending);
2736         LASSERT(!msg->msg_target_is_router);
2737         LASSERT(!msg->msg_receiving);
2738
2739         msg->msg_sending = 1;
2740
2741         LASSERT(!msg->msg_tx_committed);
2742
2743         rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
2744         if (rc < 0) {
2745                 if (rc == -EHOSTUNREACH)
2746                         msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
2747                 else
2748                         msg->msg_health_status = LNET_MSG_STATUS_LOCAL_ERROR;
2749                 return rc;
2750         }
2751
2752         if (rc == LNET_CREDIT_OK)
2753                 lnet_ni_send(msg->msg_txni, msg);
2754
2755         /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
2756         return 0;
2757 }
2758
2759 enum lnet_mt_event_type {
2760         MT_TYPE_LOCAL_NI = 0,
2761         MT_TYPE_PEER_NI
2762 };
2763
2764 struct lnet_mt_event_info {
2765         enum lnet_mt_event_type mt_type;
2766         lnet_nid_t mt_nid;
2767 };
2768
2769 /* called with res_lock held */
2770 void
2771 lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
2772 {
2773         struct lnet_rsp_tracker *rspt;
2774
2775         /*
2776          * msg has a refcount on the MD so the MD is not going away.
2777          * The rspt queue for the cpt is protected by
2778          * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
2779          */
2780         if (!md->md_rspt_ptr)
2781                 return;
2782
2783         rspt = md->md_rspt_ptr;
2784         md->md_rspt_ptr = NULL;
2785
2786         /* debug code */
2787         LASSERT(rspt->rspt_cpt == cpt);
2788
2789         /*
2790          * invalidate the handle to indicate that a response has been
2791          * received, which will then lead the monitor thread to clean up
2792          * the rspt block.
2793          */
2794         LNetInvalidateMDHandle(&rspt->rspt_mdh);
2795 }
2796
2797 static void
2798 lnet_finalize_expired_responses(bool force)
2799 {
2800         struct lnet_libmd *md;
2801         struct list_head local_queue;
2802         struct lnet_rsp_tracker *rspt, *tmp;
2803         int i;
2804
2805         if (the_lnet.ln_mt_rstq == NULL)
2806                 return;
2807
2808         cfs_cpt_for_each(i, lnet_cpt_table()) {
2809                 INIT_LIST_HEAD(&local_queue);
2810
2811                 lnet_net_lock(i);
2812                 if (!the_lnet.ln_mt_rstq[i]) {
2813                         lnet_net_unlock(i);
2814                         continue;
2815                 }
2816                 list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
2817                 lnet_net_unlock(i);
2818
2819                 list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
2820                         /*
2821                          * The rspt mdh will be invalidated when a response
2822                          * is received or whenever we want to discard the
2823                          * block the monitor thread will walk the queue
2824                          * and clean up any rsts with an invalid mdh.
2825                          * The monitor thread will walk the queue until
2826                          * the first unexpired rspt block. This means that
2827                          * some rspt blocks which received their
2828                          * corresponding responses will linger in the
2829                          * queue until they are cleaned up eventually.
2830                          */
2831                         lnet_res_lock(i);
2832                         if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
2833                                 lnet_res_unlock(i);
2834                                 list_del_init(&rspt->rspt_on_list);
2835                                 lnet_rspt_free(rspt, i);
2836                                 continue;
2837                         }
2838
2839                         if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
2840                             force) {
2841                                 struct lnet_peer_ni *lpni;
2842                                 lnet_nid_t nid;
2843
2844                                 md = lnet_handle2md(&rspt->rspt_mdh);
2845                                 if (!md) {
2846                                         LNetInvalidateMDHandle(&rspt->rspt_mdh);
2847                                         lnet_res_unlock(i);
2848                                         list_del_init(&rspt->rspt_on_list);
2849                                         lnet_rspt_free(rspt, i);
2850                                         continue;
2851                                 }
2852                                 LASSERT(md->md_rspt_ptr == rspt);
2853                                 md->md_rspt_ptr = NULL;
2854                                 lnet_res_unlock(i);
2855
2856                                 lnet_net_lock(i);
2857                                 the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
2858                                 lnet_net_unlock(i);
2859
2860                                 list_del_init(&rspt->rspt_on_list);
2861
2862                                 nid = rspt->rspt_next_hop_nid;
2863
2864                                 CNETERR("Response timed out: md = %p: nid = %s\n",
2865                                         md, libcfs_nid2str(nid));
2866                                 LNetMDUnlink(rspt->rspt_mdh);
2867                                 lnet_rspt_free(rspt, i);
2868
2869                                 /*
2870                                  * If there is a timeout on the response
2871                                  * from the next hop decrement its health
2872                                  * value so that we don't use it
2873                                  */
2874                                 lnet_net_lock(0);
2875                                 lpni = lnet_find_peer_ni_locked(nid);
2876                                 if (lpni) {
2877                                         lnet_handle_remote_failure_locked(lpni);
2878                                         lnet_peer_ni_decref_locked(lpni);
2879                                 }
2880                                 lnet_net_unlock(0);
2881                         } else {
2882                                 lnet_res_unlock(i);
2883                                 break;
2884                         }
2885                 }
2886
2887                 lnet_net_lock(i);
2888                 if (!list_empty(&local_queue))
2889                         list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
2890                 lnet_net_unlock(i);
2891         }
2892 }
2893
2894 static void
2895 lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
2896 {
2897         struct lnet_msg *msg;
2898
2899         while (!list_empty(resendq)) {
2900                 struct lnet_peer_ni *lpni;
2901
2902                 msg = list_entry(resendq->next, struct lnet_msg,
2903                                  msg_list);
2904
2905                 list_del_init(&msg->msg_list);
2906
2907                 lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid);
2908                 if (!lpni) {
2909                         lnet_net_unlock(cpt);
2910                         CERROR("Expected that a peer is already created for %s\n",
2911                                libcfs_nid2str(msg->msg_hdr.dest_nid));
2912                         msg->msg_no_resend = true;
2913                         lnet_finalize(msg, -EFAULT);
2914                         lnet_net_lock(cpt);
2915                 } else {
2916                         struct lnet_peer *peer;
2917                         int rc;
2918                         lnet_nid_t src_nid = LNET_NID_ANY;
2919
2920                         /*
2921                          * if this message is not being routed and the
2922                          * peer is non-MR then we must use the same
2923                          * src_nid that was used in the original send.
2924                          * Otherwise if we're routing the message (IE
2925                          * we're a router) then we can use any of our
2926                          * local interfaces. It doesn't matter to the
2927                          * final destination.
2928                          */
2929                         peer = lpni->lpni_peer_net->lpn_peer;
2930                         if (!msg->msg_routing &&
2931                             !lnet_peer_is_multi_rail(peer))
2932                                 src_nid = le64_to_cpu(msg->msg_hdr.src_nid);
2933
2934                         /*
2935                          * If we originally specified a src NID, then we
2936                          * must attempt to reuse it in the resend as well.
2937                          */
2938                         if (msg->msg_src_nid_param != LNET_NID_ANY)
2939                                 src_nid = msg->msg_src_nid_param;
2940                         lnet_peer_ni_decref_locked(lpni);
2941
2942                         lnet_net_unlock(cpt);
2943                         CDEBUG(D_NET, "resending %s->%s: %s recovery %d try# %d\n",
2944                                libcfs_nid2str(src_nid),
2945                                libcfs_id2str(msg->msg_target),
2946                                lnet_msgtyp2str(msg->msg_type),
2947                                msg->msg_recovery,
2948                                msg->msg_retry_count);
2949                         rc = lnet_send(src_nid, msg, LNET_NID_ANY);
2950                         if (rc) {
2951                                 CERROR("Error sending %s to %s: %d\n",
2952                                        lnet_msgtyp2str(msg->msg_type),
2953                                        libcfs_id2str(msg->msg_target), rc);
2954                                 msg->msg_no_resend = true;
2955                                 lnet_finalize(msg, rc);
2956                         }
2957                         lnet_net_lock(cpt);
2958                         if (!rc)
2959                                 the_lnet.ln_counters[cpt]->lct_health.lch_resend_count++;
2960                 }
2961         }
2962 }
2963
2964 static void
2965 lnet_resend_pending_msgs(void)
2966 {
2967         int i;
2968
2969         cfs_cpt_for_each(i, lnet_cpt_table()) {
2970                 lnet_net_lock(i);
2971                 lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i);
2972                 lnet_net_unlock(i);
2973         }
2974 }
2975
2976 /* called with cpt and ni_lock held */
2977 static void
2978 lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt, bool force)
2979 {
2980         struct lnet_handle_md recovery_mdh;
2981
2982         LNetInvalidateMDHandle(&recovery_mdh);
2983
2984         if (ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING ||
2985             force) {
2986                 recovery_mdh = ni->ni_ping_mdh;
2987                 LNetInvalidateMDHandle(&ni->ni_ping_mdh);
2988         }
2989         lnet_ni_unlock(ni);
2990         lnet_net_unlock(cpt);
2991         if (!LNetMDHandleIsInvalid(recovery_mdh))
2992                 LNetMDUnlink(recovery_mdh);
2993         lnet_net_lock(cpt);
2994         lnet_ni_lock(ni);
2995 }
2996
2997 static void
2998 lnet_recover_local_nis(void)
2999 {
3000         struct lnet_mt_event_info *ev_info;
3001         struct list_head processed_list;
3002         struct list_head local_queue;
3003         struct lnet_handle_md mdh;
3004         struct lnet_ni *tmp;
3005         struct lnet_ni *ni;
3006         lnet_nid_t nid;
3007         int healthv;
3008         int rc;
3009
3010         INIT_LIST_HEAD(&local_queue);
3011         INIT_LIST_HEAD(&processed_list);
3012
3013         /*
3014          * splice the recovery queue on a local queue. We will iterate
3015          * through the local queue and update it as needed. Once we're
3016          * done with the traversal, we'll splice the local queue back on
3017          * the head of the ln_mt_localNIRecovq. Any newly added local NIs
3018          * will be traversed in the next iteration.
3019          */
3020         lnet_net_lock(0);
3021         list_splice_init(&the_lnet.ln_mt_localNIRecovq,
3022                          &local_queue);
3023         lnet_net_unlock(0);
3024
3025         list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) {
3026                 /*
3027                  * if an NI is being deleted or it is now healthy, there
3028                  * is no need to keep it around in the recovery queue.
3029                  * The monitor thread is the only thread responsible for
3030                  * removing the NI from the recovery queue.
3031                  * Multiple threads can be adding NIs to the recovery
3032                  * queue.
3033                  */
3034                 healthv = atomic_read(&ni->ni_healthv);
3035
3036                 lnet_net_lock(0);
3037                 lnet_ni_lock(ni);
3038                 if (ni->ni_state != LNET_NI_STATE_ACTIVE ||
3039                     healthv == LNET_MAX_HEALTH_VALUE) {
3040                         list_del_init(&ni->ni_recovery);
3041                         lnet_unlink_ni_recovery_mdh_locked(ni, 0, false);
3042                         lnet_ni_unlock(ni);
3043                         lnet_ni_decref_locked(ni, 0);
3044                         lnet_net_unlock(0);
3045                         continue;
3046                 }
3047
3048                 /*
3049                  * if the local NI failed recovery we must unlink the md.
3050                  * But we want to keep the local_ni on the recovery queue
3051                  * so we can continue the attempts to recover it.
3052                  */
3053                 if (ni->ni_recovery_state & LNET_NI_RECOVERY_FAILED) {
3054                         lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
3055                         ni->ni_recovery_state &= ~LNET_NI_RECOVERY_FAILED;
3056                 }
3057
3058                 lnet_ni_unlock(ni);
3059                 lnet_net_unlock(0);
3060
3061
3062                 CDEBUG(D_NET, "attempting to recover local ni: %s\n",
3063                        libcfs_nid2str(ni->ni_nid));
3064
3065                 lnet_ni_lock(ni);
3066                 if (!(ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING)) {
3067                         ni->ni_recovery_state |= LNET_NI_RECOVERY_PENDING;
3068                         lnet_ni_unlock(ni);
3069
3070                         LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
3071                         if (!ev_info) {
3072                                 CERROR("out of memory. Can't recover %s\n",
3073                                        libcfs_nid2str(ni->ni_nid));
3074                                 lnet_ni_lock(ni);
3075                                 ni->ni_recovery_state &=
3076                                   ~LNET_NI_RECOVERY_PENDING;
3077                                 lnet_ni_unlock(ni);
3078                                 continue;
3079                         }
3080
3081                         mdh = ni->ni_ping_mdh;
3082                         /*
3083                          * Invalidate the ni mdh in case it's deleted.
3084                          * We'll unlink the mdh in this case below.
3085                          */
3086                         LNetInvalidateMDHandle(&ni->ni_ping_mdh);
3087                         nid = ni->ni_nid;
3088
3089                         /*
3090                          * remove the NI from the local queue and drop the
3091                          * reference count to it while we're recovering
3092                          * it. The reason for that, is that the NI could
3093                          * be deleted, and the way the code is structured
3094                          * is if we don't drop the NI, then the deletion
3095                          * code will enter a loop waiting for the
3096                          * reference count to be removed while holding the
3097                          * ln_mutex_lock(). When we look up the peer to
3098                          * send to in lnet_select_pathway() we will try to
3099                          * lock the ln_mutex_lock() as well, leading to
3100                          * a deadlock. By dropping the refcount and
3101                          * removing it from the list, we allow for the NI
3102                          * to be removed, then we use the cached NID to
3103                          * look it up again. If it's gone, then we just
3104                          * continue examining the rest of the queue.
3105                          */
3106                         lnet_net_lock(0);
3107                         list_del_init(&ni->ni_recovery);
3108                         lnet_ni_decref_locked(ni, 0);
3109                         lnet_net_unlock(0);
3110
3111                         ev_info->mt_type = MT_TYPE_LOCAL_NI;
3112                         ev_info->mt_nid = nid;
3113                         rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
3114                                             ev_info, the_lnet.ln_mt_eqh, true);
3115                         /* lookup the nid again */
3116                         lnet_net_lock(0);
3117                         ni = lnet_nid2ni_locked(nid, 0);
3118                         if (!ni) {
3119                                 /*
3120                                  * the NI has been deleted when we dropped
3121                                  * the ref count
3122                                  */
3123                                 lnet_net_unlock(0);
3124                                 LNetMDUnlink(mdh);
3125                                 continue;
3126                         }
3127                         /*
3128                          * Same note as in lnet_recover_peer_nis(). When
3129                          * we're sending the ping, the NI is free to be
3130                          * deleted or manipulated. By this point it
3131                          * could've been added back on the recovery queue,
3132                          * and a refcount taken on it.
3133                          * So we can't just add it blindly again or we'll
3134                          * corrupt the queue. We must check under lock if
3135                          * it's not on any list and if not then add it
3136                          * to the processed list, which will eventually be
3137                          * spliced back on to the recovery queue.
3138                          */
3139                         ni->ni_ping_mdh = mdh;
3140                         if (list_empty(&ni->ni_recovery)) {
3141                                 list_add_tail(&ni->ni_recovery, &processed_list);
3142                                 lnet_ni_addref_locked(ni, 0);
3143                         }
3144                         lnet_net_unlock(0);
3145
3146                         lnet_ni_lock(ni);
3147                         if (rc)
3148                                 ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
3149                 }
3150                 lnet_ni_unlock(ni);
3151         }
3152
3153         /*
3154          * put back the remaining NIs on the ln_mt_localNIRecovq to be
3155          * reexamined in the next iteration.
3156          */
3157         list_splice_init(&processed_list, &local_queue);
3158         lnet_net_lock(0);
3159         list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq);
3160         lnet_net_unlock(0);
3161 }
3162
3163 static struct list_head **
3164 lnet_create_array_of_queues(void)
3165 {
3166         struct list_head **qs;
3167         struct list_head *q;
3168         int i;
3169
3170         qs = cfs_percpt_alloc(lnet_cpt_table(),
3171                               sizeof(struct list_head));
3172         if (!qs) {
3173                 CERROR("Failed to allocate queues\n");
3174                 return NULL;
3175         }
3176
3177         cfs_percpt_for_each(q, i, qs)
3178                 INIT_LIST_HEAD(q);
3179
3180         return qs;
3181 }
3182
3183 static int
3184 lnet_resendqs_create(void)
3185 {
3186         struct list_head **resendqs;
3187         resendqs = lnet_create_array_of_queues();
3188
3189         if (!resendqs)
3190                 return -ENOMEM;
3191
3192         lnet_net_lock(LNET_LOCK_EX);
3193         the_lnet.ln_mt_resendqs = resendqs;
3194         lnet_net_unlock(LNET_LOCK_EX);
3195
3196         return 0;
3197 }
3198
3199 static void
3200 lnet_clean_local_ni_recoveryq(void)
3201 {
3202         struct lnet_ni *ni;
3203
3204         /* This is only called when the monitor thread has stopped */
3205         lnet_net_lock(0);
3206
3207         while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) {
3208                 ni = list_entry(the_lnet.ln_mt_localNIRecovq.next,
3209                                 struct lnet_ni, ni_recovery);
3210                 list_del_init(&ni->ni_recovery);
3211                 lnet_ni_lock(ni);
3212                 lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
3213                 lnet_ni_unlock(ni);
3214                 lnet_ni_decref_locked(ni, 0);
3215         }
3216
3217         lnet_net_unlock(0);
3218 }
3219
3220 static void
3221 lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt,
3222                                      bool force)
3223 {
3224         struct lnet_handle_md recovery_mdh;
3225
3226         LNetInvalidateMDHandle(&recovery_mdh);
3227
3228         if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING || force) {
3229                 recovery_mdh = lpni->lpni_recovery_ping_mdh;
3230                 LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
3231         }
3232         spin_unlock(&lpni->lpni_lock);
3233         lnet_net_unlock(cpt);
3234         if (!LNetMDHandleIsInvalid(recovery_mdh))
3235                 LNetMDUnlink(recovery_mdh);
3236         lnet_net_lock(cpt);
3237         spin_lock(&lpni->lpni_lock);
3238 }
3239
3240 static void
3241 lnet_clean_peer_ni_recoveryq(void)
3242 {
3243         struct lnet_peer_ni *lpni, *tmp;
3244
3245         lnet_net_lock(LNET_LOCK_EX);
3246
3247         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
3248                                  lpni_recovery) {
3249                 list_del_init(&lpni->lpni_recovery);
3250                 spin_lock(&lpni->lpni_lock);
3251                 lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX, true);
3252                 spin_unlock(&lpni->lpni_lock);
3253                 lnet_peer_ni_decref_locked(lpni);
3254         }
3255
3256         lnet_net_unlock(LNET_LOCK_EX);
3257 }
3258
3259 static void
3260 lnet_clean_resendqs(void)
3261 {
3262         struct lnet_msg *msg, *tmp;
3263         struct list_head msgs;
3264         int i;
3265
3266         INIT_LIST_HEAD(&msgs);
3267
3268         cfs_cpt_for_each(i, lnet_cpt_table()) {
3269                 lnet_net_lock(i);
3270                 list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs);
3271                 lnet_net_unlock(i);
3272                 list_for_each_entry_safe(msg, tmp, &msgs, msg_list) {
3273                         list_del_init(&msg->msg_list);
3274                         msg->msg_no_resend = true;
3275                         lnet_finalize(msg, -ESHUTDOWN);
3276                 }
3277         }
3278
3279         cfs_percpt_free(the_lnet.ln_mt_resendqs);
3280 }
3281
3282 static void
3283 lnet_recover_peer_nis(void)
3284 {
3285         struct lnet_mt_event_info *ev_info;
3286         struct list_head processed_list;
3287         struct list_head local_queue;
3288         struct lnet_handle_md mdh;
3289         struct lnet_peer_ni *lpni;
3290         struct lnet_peer_ni *tmp;
3291         lnet_nid_t nid;
3292         int healthv;
3293         int rc;
3294
3295         INIT_LIST_HEAD(&local_queue);
3296         INIT_LIST_HEAD(&processed_list);
3297
3298         /*
3299          * Always use cpt 0 for locking across all interactions with
3300          * ln_mt_peerNIRecovq
3301          */
3302         lnet_net_lock(0);
3303         list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
3304                          &local_queue);
3305         lnet_net_unlock(0);
3306
3307         list_for_each_entry_safe(lpni, tmp, &local_queue,
3308                                  lpni_recovery) {
3309                 /*
3310                  * The same protection strategy is used here as is in the
3311                  * local recovery case.
3312                  */
3313                 lnet_net_lock(0);
3314                 healthv = atomic_read(&lpni->lpni_healthv);
3315                 spin_lock(&lpni->lpni_lock);
3316                 if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
3317                     healthv == LNET_MAX_HEALTH_VALUE) {
3318                         list_del_init(&lpni->lpni_recovery);
3319                         lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, false);
3320                         spin_unlock(&lpni->lpni_lock);
3321                         lnet_peer_ni_decref_locked(lpni);
3322                         lnet_net_unlock(0);
3323                         continue;
3324                 }
3325
3326                 /*
3327                  * If the peer NI has failed recovery we must unlink the
3328                  * md. But we want to keep the peer ni on the recovery
3329                  * queue so we can try to continue recovering it
3330                  */
3331                 if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_FAILED) {
3332                         lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, true);
3333                         lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_FAILED;
3334                 }
3335
3336                 spin_unlock(&lpni->lpni_lock);
3337                 lnet_net_unlock(0);
3338
3339                 /*
3340                  * NOTE: we're racing with peer deletion from user space.
3341                  * It's possible that a peer is deleted after we check its
3342                  * state. In this case the recovery can create a new peer
3343                  */
3344                 spin_lock(&lpni->lpni_lock);
3345                 if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
3346                     !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
3347                         lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
3348                         spin_unlock(&lpni->lpni_lock);
3349
3350                         LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
3351                         if (!ev_info) {
3352                                 CERROR("out of memory. Can't recover %s\n",
3353                                        libcfs_nid2str(lpni->lpni_nid));
3354                                 spin_lock(&lpni->lpni_lock);
3355                                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3356                                 spin_unlock(&lpni->lpni_lock);
3357                                 continue;
3358                         }
3359
3360                         /* look at the comments in lnet_recover_local_nis() */
3361                         mdh = lpni->lpni_recovery_ping_mdh;
3362                         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
3363                         nid = lpni->lpni_nid;
3364                         lnet_net_lock(0);
3365                         list_del_init(&lpni->lpni_recovery);
3366                         lnet_peer_ni_decref_locked(lpni);
3367                         lnet_net_unlock(0);
3368
3369                         ev_info->mt_type = MT_TYPE_PEER_NI;
3370                         ev_info->mt_nid = nid;
3371                         rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
3372                                             ev_info, the_lnet.ln_mt_eqh, true);
3373                         lnet_net_lock(0);
3374                         /*
3375                          * lnet_find_peer_ni_locked() grabs a refcount for
3376                          * us. No need to take it explicitly.
3377                          */
3378                         lpni = lnet_find_peer_ni_locked(nid);
3379                         if (!lpni) {
3380                                 lnet_net_unlock(0);
3381                                 LNetMDUnlink(mdh);
3382                                 continue;
3383                         }
3384
3385                         lpni->lpni_recovery_ping_mdh = mdh;
3386                         /*
3387                          * While we're unlocked the lpni could've been
3388                          * readded on the recovery queue. In this case we
3389                          * don't need to add it to the local queue, since
3390                          * it's already on there and the thread that added
3391                          * it would've incremented the refcount on the
3392                          * peer, which means we need to decref the refcount
3393                          * that was implicitly grabbed by find_peer_ni_locked.
3394                          * Otherwise, if the lpni is still not on
3395                          * the recovery queue, then we'll add it to the
3396                          * processed list.
3397                          */
3398                         if (list_empty(&lpni->lpni_recovery))
3399                                 list_add_tail(&lpni->lpni_recovery, &processed_list);
3400                         else
3401                                 lnet_peer_ni_decref_locked(lpni);
3402                         lnet_net_unlock(0);
3403
3404                         spin_lock(&lpni->lpni_lock);
3405                         if (rc)
3406                                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3407                 }
3408                 spin_unlock(&lpni->lpni_lock);
3409         }
3410
3411         list_splice_init(&processed_list, &local_queue);
3412         lnet_net_lock(0);
3413         list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
3414         lnet_net_unlock(0);
3415 }
3416
3417 static int
3418 lnet_monitor_thread(void *arg)
3419 {
3420         time64_t recovery_timeout = 0;
3421         time64_t rsp_timeout = 0;
3422         int interval;
3423         time64_t now;
3424
3425         /*
3426          * The monitor thread takes care of the following:
3427          *  1. Checks the aliveness of routers
3428          *  2. Checks if there are messages on the resend queue to resend
3429          *     them.
3430          *  3. Check if there are any NIs on the local recovery queue and
3431          *     pings them
3432          *  4. Checks if there are any NIs on the remote recovery queue
3433          *     and pings them.
3434          */
3435         cfs_block_allsigs();
3436
3437         while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
3438                 now = ktime_get_real_seconds();
3439
3440                 if (lnet_router_checker_active())
3441                         lnet_check_routers();
3442
3443                 lnet_resend_pending_msgs();
3444
3445                 if (now >= rsp_timeout) {
3446                         lnet_finalize_expired_responses(false);
3447                         rsp_timeout = now + (lnet_transaction_timeout / 2);
3448                 }
3449
3450                 if (now >= recovery_timeout) {
3451                         lnet_recover_local_nis();
3452                         lnet_recover_peer_nis();
3453                         recovery_timeout = now + lnet_recovery_interval;
3454                 }
3455
3456                 /*
3457                  * TODO do we need to check if we should sleep without
3458                  * timeout?  Technically, an active system will always
3459                  * have messages in flight so this check will always
3460                  * evaluate to false. And on an idle system do we care
3461                  * if we wake up every 1 second? Although, we've seen
3462                  * cases where we get a complaint that an idle thread
3463                  * is waking up unnecessarily.
3464                  *
3465                  * Take into account the current net_count when you wake
3466                  * up for alive router checking, since we need to check
3467                  * possibly as many networks as we have configured.
3468                  */
3469                 interval = min(lnet_recovery_interval,
3470                                min((unsigned int) alive_router_check_interval /
3471                                         lnet_current_net_count,
3472                                    lnet_transaction_timeout / 2));
3473                 wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
3474                                                 false,
3475                                                 cfs_time_seconds(interval));
3476         }
3477
3478         /* Shutting down */
3479         lnet_net_lock(LNET_LOCK_EX);
3480         the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
3481         lnet_net_unlock(LNET_LOCK_EX);
3482
3483         /* signal that the monitor thread is exiting */
3484         up(&the_lnet.ln_mt_signal);
3485
3486         return 0;
3487 }
3488
3489 /*
3490  * lnet_send_ping
3491  * Sends a ping.
3492  * Returns == 0 if success
3493  * Returns > 0 if LNetMDBind or prior fails
3494  * Returns < 0 if LNetGet fails
3495  */
3496 int
3497 lnet_send_ping(lnet_nid_t dest_nid,
3498                struct lnet_handle_md *mdh, int nnis,
3499                void *user_data, struct lnet_handle_eq eqh, bool recovery)
3500 {
3501         struct lnet_md md = { NULL };
3502         struct lnet_process_id id;
3503         struct lnet_ping_buffer *pbuf;
3504         int rc;
3505
3506         if (dest_nid == LNET_NID_ANY) {
3507                 rc = -EHOSTUNREACH;
3508                 goto fail_error;
3509         }
3510
3511         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
3512         if (!pbuf) {
3513                 rc = ENOMEM;
3514                 goto fail_error;
3515         }
3516
3517         /* initialize md content */
3518         md.start     = &pbuf->pb_info;
3519         md.length    = LNET_PING_INFO_SIZE(nnis);
3520         md.threshold = 2; /* GET/REPLY */
3521         md.max_size  = 0;
3522         md.options   = LNET_MD_TRUNCATE;
3523         md.user_ptr  = user_data;
3524         md.eq_handle = eqh;
3525
3526         rc = LNetMDBind(md, LNET_UNLINK, mdh);
3527         if (rc) {
3528                 lnet_ping_buffer_decref(pbuf);
3529                 CERROR("Can't bind MD: %d\n", rc);
3530                 rc = -rc; /* change the rc to positive */
3531                 goto fail_error;
3532         }
3533         id.pid = LNET_PID_LUSTRE;
3534         id.nid = dest_nid;
3535
3536         rc = LNetGet(LNET_NID_ANY, *mdh, id,
3537                      LNET_RESERVED_PORTAL,
3538                      LNET_PROTO_PING_MATCHBITS, 0, recovery);
3539
3540         if (rc)
3541                 goto fail_unlink_md;
3542
3543         return 0;
3544
3545 fail_unlink_md:
3546         LNetMDUnlink(*mdh);
3547         LNetInvalidateMDHandle(mdh);
3548 fail_error:
3549         return rc;
3550 }
3551
3552 static void
3553 lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
3554                            int status, bool unlink_event)
3555 {
3556         lnet_nid_t nid = ev_info->mt_nid;
3557
3558         if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
3559                 struct lnet_ni *ni;
3560
3561                 lnet_net_lock(0);
3562                 ni = lnet_nid2ni_locked(nid, 0);
3563                 if (!ni) {
3564                         lnet_net_unlock(0);
3565                         return;
3566                 }
3567                 lnet_ni_lock(ni);
3568                 ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
3569                 if (status)
3570                         ni->ni_recovery_state |= LNET_NI_RECOVERY_FAILED;
3571                 lnet_ni_unlock(ni);
3572                 lnet_net_unlock(0);
3573
3574                 if (status != 0) {
3575                         CERROR("local NI (%s) recovery failed with %d\n",
3576                                libcfs_nid2str(nid), status);
3577                         return;
3578                 }
3579                 /*
3580                  * need to increment healthv for the ni here, because in
3581                  * the lnet_finalize() path we don't have access to this
3582                  * NI. And in order to get access to it, we'll need to
3583                  * carry forward too much information.
3584                  * In the peer case, it'll naturally be incremented
3585                  */
3586                 if (!unlink_event)
3587                         lnet_inc_healthv(&ni->ni_healthv);
3588         } else {
3589                 struct lnet_peer_ni *lpni;
3590                 int cpt;
3591
3592                 cpt = lnet_net_lock_current();
3593                 lpni = lnet_find_peer_ni_locked(nid);
3594                 if (!lpni) {
3595                         lnet_net_unlock(cpt);
3596                         return;
3597                 }
3598                 spin_lock(&lpni->lpni_lock);
3599                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3600                 if (status)
3601                         lpni->lpni_state |= LNET_PEER_NI_RECOVERY_FAILED;
3602                 spin_unlock(&lpni->lpni_lock);
3603                 lnet_peer_ni_decref_locked(lpni);
3604                 lnet_net_unlock(cpt);
3605
3606                 if (status != 0)
3607                         CERROR("peer NI (%s) recovery failed with %d\n",
3608                                libcfs_nid2str(nid), status);
3609         }
3610 }
3611
3612 void
3613 lnet_mt_event_handler(struct lnet_event *event)
3614 {
3615         struct lnet_mt_event_info *ev_info = event->md.user_ptr;
3616         struct lnet_ping_buffer *pbuf;
3617
3618         /* TODO: remove assert */
3619         LASSERT(event->type == LNET_EVENT_REPLY ||
3620                 event->type == LNET_EVENT_SEND ||
3621                 event->type == LNET_EVENT_UNLINK);
3622
3623         CDEBUG(D_NET, "Received event: %d status: %d\n", event->type,
3624                event->status);
3625
3626         switch (event->type) {
3627         case LNET_EVENT_UNLINK:
3628                 CDEBUG(D_NET, "%s recovery ping unlinked\n",
3629                        libcfs_nid2str(ev_info->mt_nid));
3630         case LNET_EVENT_REPLY:
3631                 lnet_handle_recovery_reply(ev_info, event->status,
3632                                            event->type == LNET_EVENT_UNLINK);
3633                 break;
3634         case LNET_EVENT_SEND:
3635                 CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
3636                                libcfs_nid2str(ev_info->mt_nid),
3637                                (event->status) ? "unsuccessfully" :
3638                                "successfully", event->status);
3639                 break;
3640         default:
3641                 CERROR("Unexpected event: %d\n", event->type);
3642                 break;
3643         }
3644         if (event->unlinked) {
3645                 LIBCFS_FREE(ev_info, sizeof(*ev_info));
3646                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
3647                 lnet_ping_buffer_decref(pbuf);
3648         }
3649 }
3650
3651 static int
3652 lnet_rsp_tracker_create(void)
3653 {
3654         struct list_head **rstqs;
3655         rstqs = lnet_create_array_of_queues();
3656
3657         if (!rstqs)
3658                 return -ENOMEM;
3659
3660         the_lnet.ln_mt_rstq = rstqs;
3661
3662         return 0;
3663 }
3664
3665 static void
3666 lnet_rsp_tracker_clean(void)
3667 {
3668         lnet_finalize_expired_responses(true);
3669
3670         cfs_percpt_free(the_lnet.ln_mt_rstq);
3671         the_lnet.ln_mt_rstq = NULL;
3672 }
3673
3674 int lnet_monitor_thr_start(void)
3675 {
3676         int rc = 0;
3677         struct task_struct *task;
3678
3679         if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN)
3680                 return -EALREADY;
3681
3682         rc = lnet_resendqs_create();
3683         if (rc)
3684                 return rc;
3685
3686         rc = lnet_rsp_tracker_create();
3687         if (rc)
3688                 goto clean_queues;
3689
3690         sema_init(&the_lnet.ln_mt_signal, 0);
3691
3692         lnet_net_lock(LNET_LOCK_EX);
3693         the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
3694         lnet_net_unlock(LNET_LOCK_EX);
3695         task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
3696         if (IS_ERR(task)) {
3697                 rc = PTR_ERR(task);
3698                 CERROR("Can't start monitor thread: %d\n", rc);
3699                 goto clean_thread;
3700         }
3701
3702         return 0;
3703
3704 clean_thread:
3705         lnet_net_lock(LNET_LOCK_EX);
3706         the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
3707         lnet_net_unlock(LNET_LOCK_EX);
3708         /* block until event callback signals exit */
3709         down(&the_lnet.ln_mt_signal);
3710         /* clean up */
3711         lnet_net_lock(LNET_LOCK_EX);
3712         the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
3713         lnet_net_unlock(LNET_LOCK_EX);
3714         lnet_rsp_tracker_clean();
3715         lnet_clean_local_ni_recoveryq();
3716         lnet_clean_peer_ni_recoveryq();
3717         lnet_clean_resendqs();
3718         LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
3719         return rc;
3720 clean_queues:
3721         lnet_rsp_tracker_clean();
3722         lnet_clean_local_ni_recoveryq();
3723         lnet_clean_peer_ni_recoveryq();
3724         lnet_clean_resendqs();
3725         return rc;
3726 }
3727
3728 void lnet_monitor_thr_stop(void)
3729 {
3730         if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
3731                 return;
3732
3733         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
3734         lnet_net_lock(LNET_LOCK_EX);
3735         the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
3736         lnet_net_unlock(LNET_LOCK_EX);
3737
3738         /* tell the monitor thread that we're shutting down */
3739         wake_up(&the_lnet.ln_mt_waitq);
3740
3741         /* block until monitor thread signals that it's done */
3742         down(&the_lnet.ln_mt_signal);
3743         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
3744
3745         /* perform cleanup tasks */
3746         lnet_rsp_tracker_clean();
3747         lnet_clean_local_ni_recoveryq();
3748         lnet_clean_peer_ni_recoveryq();
3749         lnet_clean_resendqs();
3750
3751         return;
3752 }
3753
3754 void
3755 lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
3756                   __u32 msg_type)
3757 {
3758         lnet_net_lock(cpt);
3759         lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
3760         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
3761         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length += nob;
3762         lnet_net_unlock(cpt);
3763
3764         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
3765 }
3766
3767 static void
3768 lnet_recv_put(struct lnet_ni *ni, struct lnet_msg *msg)
3769 {
3770         struct lnet_hdr *hdr = &msg->msg_hdr;
3771<